feat: check ts in submit req msg
This commit is contained in:
parent
5fdaa4346d
commit
19d64d7c25
|
@ -104,6 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
|
||||||
int tsdbClose(STsdb** pTsdb);
|
int tsdbClose(STsdb** pTsdb);
|
||||||
int tsdbBegin(STsdb* pTsdb);
|
int tsdbBegin(STsdb* pTsdb);
|
||||||
int tsdbCommit(STsdb* pTsdb);
|
int tsdbCommit(STsdb* pTsdb);
|
||||||
|
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||||
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
|
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
|
||||||
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||||
|
|
||||||
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
|
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
@ -54,7 +54,38 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
#if 0
|
||||||
|
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey,
|
||||||
|
TSKEY now) {
|
||||||
|
TSKEY rowKey = TD_ROW_KEY(row);
|
||||||
|
if (rowKey < minKey || rowKey > maxKey) {
|
||||||
|
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||||
|
" maxKey %" PRId64 " row key %" PRId64,
|
||||||
|
REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
|
||||||
|
rowKey);
|
||||||
|
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *row, TSKEY minKey, TSKEY maxKey,
|
||||||
|
TSKEY now) {
|
||||||
|
TSKEY rowKey = TD_ROW_KEY(row);
|
||||||
|
if (rowKey < minKey || rowKey > maxKey) {
|
||||||
|
tsdbError("vgId:%d table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||||
|
" maxKey %" PRId64 " row key %" PRId64,
|
||||||
|
REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey);
|
||||||
|
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||||
ASSERT(pMsg != NULL);
|
ASSERT(pMsg != NULL);
|
||||||
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
|
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
@ -112,14 +143,14 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||||
if (tsdbCheckRowRange(pTsdb, pTable, row, minKey, maxKey, now) < 0) {
|
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||||
|
|
|
@ -626,6 +626,11 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
||||||
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
|
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
|
||||||
|
pRsp->code = terrno;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
// handle the request
|
// handle the request
|
||||||
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
|
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
|
||||||
pRsp->code = TSDB_CODE_INVALID_MSG;
|
pRsp->code = TSDB_CODE_INVALID_MSG;
|
||||||
|
|
Loading…
Reference in New Issue