From c1a3a0855b157133ddd6dfd43858f19b42b1426d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 15 Feb 2023 17:59:54 +0800 Subject: [PATCH] finish code --- include/common/tdataformat.h | 2 +- source/dnode/vnode/src/tsdb/tsdbWrite.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 243 +++++++++++++++--------- 3 files changed, 157 insertions(+), 90 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index d7a62f5402..a36a7513f3 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -205,7 +205,7 @@ struct SColData { int32_t numOfNull; // # of null int32_t numOfValue; // # of vale int32_t nVal; - uint8_t flag; + int8_t flag; uint8_t *pBitMap; int32_t *aOffset; int32_t nData; diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 2ad971ca28..301b504346 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -22,7 +22,7 @@ * us: 3600*1000000*8765*1000 // 1970 + 1000 years * ns: 3600*1000000000*8765*292 // 1970 + 292 years */ -static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L}; +int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L}; // static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 005c95ab38..983bea3706 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -13,7 +13,12 @@ * along with this program. If not, see . */ +#include +#include "tencode.h" +#include "tmsg.h" #include "vnd.h" +#include "vnode.h" +#include "vnodeInt.h" static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -31,6 +36,48 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) { + int32_t code = 0; + int32_t lino = 0; + + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // flags + if (tDecodeI32v(pCoder, NULL) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // name + char *name = NULL; + if (tDecodeCStr(pCoder, &name) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // uid + int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name); + if (uid == 0) { + uid = tGenIdPI64(); + } + *(int64_t *)(pCoder->data + pCoder->pos) = uid; + + // ctime + *(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime; + + tEndDecode(pCoder); + +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid); + } + return code; +} static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; int32_t lino = 0; @@ -50,26 +97,8 @@ static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) { TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t iReq = 0; iReq < nReqs; iReq++) { - tb_uid_t uid = tGenIdPI64(); - char *name = NULL; - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - *(int64_t *)(dc.data + dc.pos) = uid; - *(int64_t *)(dc.data + dc.pos + 8) = ctime; - - vTrace("vgId:%d table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid); - tEndDecode(&dc); + code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime); + TSDB_CHECK_CODE(code, lino, _exit); } tEndDecode(&dc); @@ -78,80 +107,118 @@ _exit: tDecoderClear(&dc); return code; } - -static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { +extern int64_t tsMaxKeyByPrecision[]; +static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) { int32_t code = 0; int32_t lino = 0; - int64_t ctime = taosGetTimestampMs(); - SDecoder dc = {0}; - - tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); - tStartDecode(&dc); - - uint64_t nSubmitTbData; - if (tDecodeU64v(&dc, &nSubmitTbData) < 0) { + if (tStartDecode(pCoder) < 0) { code = TSDB_CODE_INVALID_MSG; TSDB_CHECK_CODE(code, lino, _exit); } - for (int32_t i = 0; i < nSubmitTbData; i++) { - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - - int32_t flags; - if (tDecodeI32v(&dc, &flags) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { - // SVCreateTbReq - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - - char *name = NULL; - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name); - if (uid == 0) { - uid = tGenIdPI64(); - } - - *(int64_t *)(dc.data + dc.pos) = uid; - *(int64_t *)(dc.data + dc.pos + 8) = ctime; - - tEndDecode(&dc); - - // SSubmitTbData - int64_t suid; - if (tDecodeI64(&dc, &suid) < 0) { - code = TSDB_CODE_INVALID_MSG; - TSDB_CHECK_CODE(code, lino, _exit); - } - - *(int64_t *)(dc.data + dc.pos) = uid; - } - - tEndDecode(&dc); + SSubmitTbData submitTbData; + if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); } - tEndDecode(&dc); - tDecoderClear(&dc); + if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // submit data + if (tDecodeI64(pCoder, &submitTbData.suid) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (tDecodeI64(pCoder, &submitTbData.uid) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // scan and check + TSKEY now = ctime; + if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) { + now *= 1000; + } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) { + now *= 1000000; + } + TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2; + TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision]; + if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + uint64_t nColData; + if (tDecodeU64v(pCoder, &nColData) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + SColData colData = {0}; + pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData); + + for (int32_t iRow = 0; iRow < colData.nVal; iRow++) { + if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) { + code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + goto _exit; + } + } + } else { + uint64_t nRow; + if (tDecodeU64v(pCoder, &nRow) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + for (int32_t iRow = 0; iRow < nRow; ++iRow) { + SRow *pRow = (SRow *)(pCoder->data + pCoder->pos); + pCoder->pos += pRow->len; + + if (pRow->ts < minKey || pRow->ts > maxKey) { + code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + goto _exit; + } + } + } + + tEndDecode(pCoder); _exit: + return 0; +} +static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t lino = 0; + + SDecoder *pCoder = &(SDecoder){0}; + + tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + uint64_t nSubmitTbData; + if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t ctime = taosGetTimestampMs(); + for (int32_t i = 0; i < nSubmitTbData; i++) { + code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime); + TSDB_CHECK_CODE(code, lino, _exit); + } + + tEndDecode(pCoder); + +_exit: + tDecoderClear(pCoder); return code; } @@ -923,11 +990,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } tDecoderClear(&dc); - // check - code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq); - if (code) { - goto _exit; - } + // // check + // code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq); + // if (code) { + // goto _exit; + // } for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);