Merge pull request #12653 from taosdata/feature/TD-14481-3.0
feat: check ts in submit req msg
This commit is contained in:
commit
e038b2f458
|
@ -104,6 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
|
|||
int tsdbClose(STsdb** pTsdb);
|
||||
int tsdbBegin(STsdb* pTsdb);
|
||||
int tsdbCommit(STsdb* pTsdb);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
|
||||
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||
|
||||
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
|
@ -54,7 +54,38 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||
#if 0
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey,
|
||||
TSKEY now) {
|
||||
TSKEY rowKey = TD_ROW_KEY(row);
|
||||
if (rowKey < minKey || rowKey > maxKey) {
|
||||
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||
" maxKey %" PRId64 " row key %" PRId64,
|
||||
REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
|
||||
rowKey);
|
||||
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *row, TSKEY minKey, TSKEY maxKey,
|
||||
TSKEY now) {
|
||||
TSKEY rowKey = TD_ROW_KEY(row);
|
||||
if (rowKey < minKey || rowKey > maxKey) {
|
||||
tsdbError("vgId:%d table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||
" maxKey %" PRId64 " row key %" PRId64,
|
||||
REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey);
|
||||
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) {
|
||||
ASSERT(pMsg != NULL);
|
||||
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
|
@ -112,14 +143,14 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (tsdbCheckRowRange(pTsdb, pTable, row, minKey, maxKey, now) < 0) {
|
||||
#endif
|
||||
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
|
|
|
@ -631,6 +631,11 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
|
||||
#endif
|
||||
|
||||
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
|
||||
pRsp->code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// handle the request
|
||||
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
|
||||
pRsp->code = TSDB_CODE_INVALID_MSG;
|
||||
|
|
|
@ -57,7 +57,7 @@ class TDSql:
|
|||
tdLog.notice("'reset query cache' is not supported")
|
||||
s = 'drop database if exists db'
|
||||
self.cursor.execute(s)
|
||||
s = 'create database db'
|
||||
s = 'create database db days 300'
|
||||
self.cursor.execute(s)
|
||||
s = 'use db'
|
||||
self.cursor.execute(s)
|
||||
|
|
|
@ -5,7 +5,7 @@ sleep 500
|
|||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database d0
|
||||
sql create database d0 days 300
|
||||
sql use d0
|
||||
|
||||
print =============== create super table and child table
|
||||
|
|
Loading…
Reference in New Issue