diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h
index d7a62f5402..a36a7513f3 100644
--- a/include/common/tdataformat.h
+++ b/include/common/tdataformat.h
@@ -205,7 +205,7 @@ struct SColData {
int32_t numOfNull; // # of null
int32_t numOfValue; // # of vale
int32_t nVal;
- uint8_t flag;
+ int8_t flag;
uint8_t *pBitMap;
int32_t *aOffset;
int32_t nData;
diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c
index 2ad971ca28..301b504346 100644
--- a/source/dnode/vnode/src/tsdb/tsdbWrite.c
+++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c
@@ -22,7 +22,7 @@
* us: 3600*1000000*8765*1000 // 1970 + 1000 years
* ns: 3600*1000000000*8765*292 // 1970 + 292 years
*/
-static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
+int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c
index 005c95ab38..983bea3706 100644
--- a/source/dnode/vnode/src/vnd/vnodeSvr.c
+++ b/source/dnode/vnode/src/vnd/vnodeSvr.c
@@ -13,7 +13,12 @@
* along with this program. If not, see .
*/
+#include
+#include "tencode.h"
+#include "tmsg.h"
#include "vnd.h"
+#include "vnode.h"
+#include "vnodeInt.h"
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
@@ -31,6 +36,48 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
+static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) {
+ int32_t code = 0;
+ int32_t lino = 0;
+
+ if (tStartDecode(pCoder) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ // flags
+ if (tDecodeI32v(pCoder, NULL) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ // name
+ char *name = NULL;
+ if (tDecodeCStr(pCoder, &name) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ // uid
+ int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
+ if (uid == 0) {
+ uid = tGenIdPI64();
+ }
+ *(int64_t *)(pCoder->data + pCoder->pos) = uid;
+
+ // ctime
+ *(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime;
+
+ tEndDecode(pCoder);
+
+_exit:
+ if (code) {
+ vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
+ } else {
+ vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
+ }
+ return code;
+}
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t lino = 0;
@@ -50,26 +97,8 @@ static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
- tb_uid_t uid = tGenIdPI64();
- char *name = NULL;
- if (tStartDecode(&dc) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
-
- if (tDecodeI32v(&dc, NULL) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
- if (tDecodeCStr(&dc, &name) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
- *(int64_t *)(dc.data + dc.pos) = uid;
- *(int64_t *)(dc.data + dc.pos + 8) = ctime;
-
- vTrace("vgId:%d table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
- tEndDecode(&dc);
+ code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime);
+ TSDB_CHECK_CODE(code, lino, _exit);
}
tEndDecode(&dc);
@@ -78,80 +107,118 @@ _exit:
tDecoderClear(&dc);
return code;
}
-
-static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
+extern int64_t tsMaxKeyByPrecision[];
+static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) {
int32_t code = 0;
int32_t lino = 0;
- int64_t ctime = taosGetTimestampMs();
- SDecoder dc = {0};
-
- tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
- tStartDecode(&dc);
-
- uint64_t nSubmitTbData;
- if (tDecodeU64v(&dc, &nSubmitTbData) < 0) {
+ if (tStartDecode(pCoder) < 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
- for (int32_t i = 0; i < nSubmitTbData; i++) {
- if (tStartDecode(&dc) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
-
- int32_t flags;
- if (tDecodeI32v(&dc, &flags) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
-
- if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
- // SVCreateTbReq
- if (tStartDecode(&dc) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
-
- if (tDecodeI32v(&dc, NULL) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
-
- char *name = NULL;
- if (tDecodeCStr(&dc, &name) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
-
- int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
- if (uid == 0) {
- uid = tGenIdPI64();
- }
-
- *(int64_t *)(dc.data + dc.pos) = uid;
- *(int64_t *)(dc.data + dc.pos + 8) = ctime;
-
- tEndDecode(&dc);
-
- // SSubmitTbData
- int64_t suid;
- if (tDecodeI64(&dc, &suid) < 0) {
- code = TSDB_CODE_INVALID_MSG;
- TSDB_CHECK_CODE(code, lino, _exit);
- }
-
- *(int64_t *)(dc.data + dc.pos) = uid;
- }
-
- tEndDecode(&dc);
+ SSubmitTbData submitTbData;
+ if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
}
- tEndDecode(&dc);
- tDecoderClear(&dc);
+ if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
+ code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime);
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ // submit data
+ if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+ if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+ if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ // scan and check
+ TSKEY now = ctime;
+ if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
+ now *= 1000;
+ } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
+ now *= 1000000;
+ }
+ TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
+ TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
+ if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
+ uint64_t nColData;
+ if (tDecodeU64v(pCoder, &nColData) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ goto _exit;
+ }
+
+ SColData colData = {0};
+ pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
+
+ for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
+ if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
+ code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
+ goto _exit;
+ }
+ }
+ } else {
+ uint64_t nRow;
+ if (tDecodeU64v(pCoder, &nRow) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ goto _exit;
+ }
+
+ for (int32_t iRow = 0; iRow < nRow; ++iRow) {
+ SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
+ pCoder->pos += pRow->len;
+
+ if (pRow->ts < minKey || pRow->ts > maxKey) {
+ code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
+ goto _exit;
+ }
+ }
+ }
+
+ tEndDecode(pCoder);
_exit:
+ return 0;
+}
+static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
+ int32_t code = 0;
+ int32_t lino = 0;
+
+ SDecoder *pCoder = &(SDecoder){0};
+
+ tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
+
+ if (tStartDecode(pCoder) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ uint64_t nSubmitTbData;
+ if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
+ code = TSDB_CODE_INVALID_MSG;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ int64_t ctime = taosGetTimestampMs();
+ for (int32_t i = 0; i < nSubmitTbData; i++) {
+ code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime);
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ tEndDecode(pCoder);
+
+_exit:
+ tDecoderClear(pCoder);
return code;
}
@@ -923,11 +990,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
}
tDecoderClear(&dc);
- // check
- code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
- if (code) {
- goto _exit;
- }
+ // // check
+ // code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
+ // if (code) {
+ // goto _exit;
+ // }
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);