finish code
This commit is contained in:
parent
b60392b670
commit
c1a3a0855b
|
@ -205,7 +205,7 @@ struct SColData {
|
||||||
int32_t numOfNull; // # of null
|
int32_t numOfNull; // # of null
|
||||||
int32_t numOfValue; // # of vale
|
int32_t numOfValue; // # of vale
|
||||||
int32_t nVal;
|
int32_t nVal;
|
||||||
uint8_t flag;
|
int8_t flag;
|
||||||
uint8_t *pBitMap;
|
uint8_t *pBitMap;
|
||||||
int32_t *aOffset;
|
int32_t *aOffset;
|
||||||
int32_t nData;
|
int32_t nData;
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
* us: 3600*1000000*8765*1000 // 1970 + 1000 years
|
* us: 3600*1000000*8765*1000 // 1970 + 1000 years
|
||||||
* ns: 3600*1000000000*8765*292 // 1970 + 292 years
|
* ns: 3600*1000000000*8765*292 // 1970 + 292 years
|
||||||
*/
|
*/
|
||||||
static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
|
int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
|
||||||
|
|
||||||
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <bits/stdint-intn.h>
|
||||||
|
#include "tencode.h"
|
||||||
|
#include "tmsg.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
@ -31,6 +36,48 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
|
||||||
|
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
if (tStartDecode(pCoder) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// flags
|
||||||
|
if (tDecodeI32v(pCoder, NULL) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// name
|
||||||
|
char *name = NULL;
|
||||||
|
if (tDecodeCStr(pCoder, &name) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// uid
|
||||||
|
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
|
||||||
|
if (uid == 0) {
|
||||||
|
uid = tGenIdPI64();
|
||||||
|
}
|
||||||
|
*(int64_t *)(pCoder->data + pCoder->pos) = uid;
|
||||||
|
|
||||||
|
// ctime
|
||||||
|
*(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime;
|
||||||
|
|
||||||
|
tEndDecode(pCoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -50,108 +97,128 @@ static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
||||||
tb_uid_t uid = tGenIdPI64();
|
code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime);
|
||||||
char *name = NULL;
|
|
||||||
if (tStartDecode(&dc) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
if (tDecodeCStr(&dc, &name) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
|
||||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
|
||||||
|
|
||||||
vTrace("vgId:%d table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
|
|
||||||
tEndDecode(&dc);
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(&dc);
|
tEndDecode(&dc);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
extern int64_t tsMaxKeyByPrecision[];
|
||||||
|
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
if (tStartDecode(pCoder) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSubmitTbData submitTbData;
|
||||||
|
if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||||
|
code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// submit data
|
||||||
|
if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan and check
|
||||||
|
TSKEY now = ctime;
|
||||||
|
if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
now *= 1000;
|
||||||
|
} else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
|
now *= 1000000;
|
||||||
|
}
|
||||||
|
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
|
||||||
|
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
|
||||||
|
if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||||
|
uint64_t nColData;
|
||||||
|
if (tDecodeU64v(pCoder, &nColData) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColData colData = {0};
|
||||||
|
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
|
||||||
|
|
||||||
|
for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
|
||||||
|
if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
|
||||||
|
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
uint64_t nRow;
|
||||||
|
if (tDecodeU64v(pCoder, &nRow) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
||||||
|
SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
|
||||||
|
pCoder->pos += pRow->len;
|
||||||
|
|
||||||
|
if (pRow->ts < minKey || pRow->ts > maxKey) {
|
||||||
|
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(pCoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
int64_t ctime = taosGetTimestampMs();
|
SDecoder *pCoder = &(SDecoder){0};
|
||||||
SDecoder dc = {0};
|
|
||||||
|
|
||||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||||
tStartDecode(&dc);
|
|
||||||
|
if (tStartDecode(pCoder) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t nSubmitTbData;
|
uint64_t nSubmitTbData;
|
||||||
if (tDecodeU64v(&dc, &nSubmitTbData) < 0) {
|
if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t ctime = taosGetTimestampMs();
|
||||||
for (int32_t i = 0; i < nSubmitTbData; i++) {
|
for (int32_t i = 0; i < nSubmitTbData; i++) {
|
||||||
if (tStartDecode(&dc) < 0) {
|
code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime);
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t flags;
|
tEndDecode(pCoder);
|
||||||
if (tDecodeI32v(&dc, &flags) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
|
||||||
// SVCreateTbReq
|
|
||||||
if (tStartDecode(&dc) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
char *name = NULL;
|
|
||||||
if (tDecodeCStr(&dc, &name) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
|
|
||||||
if (uid == 0) {
|
|
||||||
uid = tGenIdPI64();
|
|
||||||
}
|
|
||||||
|
|
||||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
|
||||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
|
||||||
|
|
||||||
tEndDecode(&dc);
|
|
||||||
|
|
||||||
// SSubmitTbData
|
|
||||||
int64_t suid;
|
|
||||||
if (tDecodeI64(&dc, &suid) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(&dc);
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(&dc);
|
|
||||||
tDecoderClear(&dc);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
tDecoderClear(pCoder);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -923,11 +990,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
}
|
}
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
|
|
||||||
// check
|
// // check
|
||||||
code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
|
// code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
|
||||||
if (code) {
|
// if (code) {
|
||||||
goto _exit;
|
// goto _exit;
|
||||||
}
|
// }
|
||||||
|
|
||||||
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
|
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
|
||||||
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
|
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
|
||||||
|
|
Loading…
Reference in New Issue