commit
d7d32559c8
|
@ -205,7 +205,7 @@ struct SColData {
|
||||||
int32_t numOfNull; // # of null
|
int32_t numOfNull; // # of null
|
||||||
int32_t numOfValue; // # of vale
|
int32_t numOfValue; // # of vale
|
||||||
int32_t nVal;
|
int32_t nVal;
|
||||||
uint8_t flag;
|
int8_t flag;
|
||||||
uint8_t *pBitMap;
|
uint8_t *pBitMap;
|
||||||
int32_t *aOffset;
|
int32_t *aOffset;
|
||||||
int32_t nData;
|
int32_t nData;
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
* us: 3600*1000000*8765*1000 // 1970 + 1000 years
|
* us: 3600*1000000*8765*1000 // 1970 + 1000 years
|
||||||
* ns: 3600*1000000000*8765*292 // 1970 + 292 years
|
* ns: 3600*1000000000*8765*292 // 1970 + 292 years
|
||||||
*/
|
*/
|
||||||
static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
|
int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
|
||||||
|
|
||||||
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||||
|
|
||||||
|
@ -60,23 +60,6 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey,
|
|
||||||
TSKEY now) {
|
|
||||||
TSKEY rowKey = TD_ROW_KEY(row);
|
|
||||||
if (rowKey < minKey || rowKey > maxKey) {
|
|
||||||
tsdbError("vgId:%d, table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
|
||||||
" maxKey %" PRId64 " row key %" PRId64,
|
|
||||||
REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
|
|
||||||
rowKey);
|
|
||||||
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey,
|
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey,
|
||||||
TSKEY now) {
|
TSKEY now) {
|
||||||
if (rowKey < minKey || rowKey > maxKey) {
|
if (rowKey < minKey || rowKey > maxKey) {
|
||||||
|
@ -89,79 +72,6 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowK
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
|
|
||||||
SSubmitMsgIter msgIter = {0};
|
|
||||||
SSubmitBlk *pBlock = NULL;
|
|
||||||
SSubmitBlkIter blkIter = {0};
|
|
||||||
STSRow *row = NULL;
|
|
||||||
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
|
|
||||||
TSKEY now = taosGetTimestamp(pCfg->precision);
|
|
||||||
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
|
|
||||||
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
// pMsg->length = htonl(pMsg->length);
|
|
||||||
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
|
||||||
while (true) {
|
|
||||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
|
||||||
if (pBlock == NULL) break;
|
|
||||||
|
|
||||||
// pBlock->uid = htobe64(pBlock->uid);
|
|
||||||
// pBlock->suid = htobe64(pBlock->suid);
|
|
||||||
// pBlock->sversion = htonl(pBlock->sversion);
|
|
||||||
// pBlock->dataLen = htonl(pBlock->dataLen);
|
|
||||||
// pBlock->schemaLen = htonl(pBlock->schemaLen);
|
|
||||||
// pBlock->numOfRows = htonl(pBlock->numOfRows);
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
|
|
||||||
tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid,
|
|
||||||
pBlock->tid);
|
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
STable *pTable = pMeta->tables[pBlock->tid];
|
|
||||||
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
|
|
||||||
tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid,
|
|
||||||
pBlock->tid);
|
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
|
||||||
tsdbError("vgId:%d, invalid action trying to insert a super table %s", REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable));
|
|
||||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check schema version and update schema if needed
|
|
||||||
if (tsdbCheckTableSchema(pTsdb, pBlock, pTable) < 0) {
|
|
||||||
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
|
||||||
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
|
||||||
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
|
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
|
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
|
||||||
|
|
|
@ -13,7 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tencode.h"
|
||||||
|
#include "tmsg.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
@ -31,158 +35,254 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
|
||||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
if (tStartDecode(pCoder) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// flags
|
||||||
|
if (tDecodeI32v(pCoder, NULL) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// name
|
||||||
|
char *name = NULL;
|
||||||
|
if (tDecodeCStr(pCoder, &name) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// uid
|
||||||
|
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
|
||||||
|
if (uid == 0) {
|
||||||
|
uid = tGenIdPI64();
|
||||||
|
}
|
||||||
|
*(int64_t *)(pCoder->data + pCoder->pos) = uid;
|
||||||
|
|
||||||
|
// ctime
|
||||||
|
*(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime;
|
||||||
|
|
||||||
|
tEndDecode(pCoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
|
||||||
|
if (pUid) *pUid = uid;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
int64_t ctime = taosGetTimestampMs();
|
||||||
SDecoder dc = {0};
|
SDecoder dc = {0};
|
||||||
|
int32_t nReqs;
|
||||||
|
|
||||||
|
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||||
|
if (tStartDecode(&dc) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tDecodeI32v(&dc, &nReqs) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
||||||
|
code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime, NULL);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(&dc);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
extern int64_t tsMaxKeyByPrecision[];
|
||||||
|
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
if (tStartDecode(pCoder) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSubmitTbData submitTbData;
|
||||||
|
if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t uid;
|
||||||
|
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||||
|
code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime, &uid);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// submit data
|
||||||
|
if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||||
|
*(int64_t *)(pCoder->data + pCoder->pos) = uid;
|
||||||
|
pCoder->pos += sizeof(int64_t);
|
||||||
|
} else {
|
||||||
|
tDecodeI64(pCoder, &submitTbData.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan and check
|
||||||
|
TSKEY now = ctime;
|
||||||
|
if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
now *= 1000;
|
||||||
|
} else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
|
now *= 1000000;
|
||||||
|
}
|
||||||
|
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
|
||||||
|
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
|
||||||
|
if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||||
|
uint64_t nColData;
|
||||||
|
if (tDecodeU64v(pCoder, &nColData) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColData colData = {0};
|
||||||
|
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
|
||||||
|
|
||||||
|
for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
|
||||||
|
if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
|
||||||
|
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
uint64_t nRow;
|
||||||
|
if (tDecodeU64v(pCoder, &nRow) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
||||||
|
SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
|
||||||
|
pCoder->pos += pRow->len;
|
||||||
|
|
||||||
|
if (pRow->ts < minKey || pRow->ts > maxKey) {
|
||||||
|
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(pCoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
SDecoder *pCoder = &(SDecoder){0};
|
||||||
|
|
||||||
|
tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||||
|
|
||||||
|
if (tStartDecode(pCoder) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t nSubmitTbData;
|
||||||
|
if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t ctime = taosGetTimestampMs();
|
||||||
|
for (int32_t i = 0; i < nSubmitTbData; i++) {
|
||||||
|
code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(pCoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tDecoderClear(pCoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
int32_t size;
|
||||||
|
int32_t ret;
|
||||||
|
uint8_t *pCont;
|
||||||
|
SEncoder *pCoder = &(SEncoder){0};
|
||||||
|
SDeleteRes res = {0};
|
||||||
|
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
|
|
||||||
|
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
// malloc and encode
|
||||||
|
tEncodeSize(tEncodeDeleteRes, &res, size, ret);
|
||||||
|
pCont = rpcMallocCont(size + sizeof(SMsgHead));
|
||||||
|
|
||||||
|
((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
|
||||||
|
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
|
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
|
||||||
|
tEncodeDeleteRes(pCoder, &res);
|
||||||
|
tEncoderClear(pCoder);
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = pCont;
|
||||||
|
pMsg->contLen = size + sizeof(SMsgHead);
|
||||||
|
|
||||||
|
taosArrayDestroy(res.uidList);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_CREATE_TABLE: {
|
case TDMT_VND_CREATE_TABLE: {
|
||||||
int64_t ctime = taosGetTimestampMs();
|
code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
|
||||||
int32_t nReqs;
|
|
||||||
|
|
||||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
|
||||||
if (tStartDecode(&dc) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tDecodeI32v(&dc, &nReqs) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
|
||||||
tb_uid_t uid = tGenIdPI64();
|
|
||||||
char *name = NULL;
|
|
||||||
if (tStartDecode(&dc) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
if (tDecodeCStr(&dc, &name) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
|
||||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
|
||||||
|
|
||||||
vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
|
|
||||||
tEndDecode(&dc);
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(&dc);
|
|
||||||
tDecoderClear(&dc);
|
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_SUBMIT: {
|
case TDMT_VND_SUBMIT: {
|
||||||
int64_t ctime = taosGetTimestampMs();
|
code = vnodePreProcessSubmitMsg(pVnode, pMsg);
|
||||||
|
|
||||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
|
||||||
tStartDecode(&dc);
|
|
||||||
|
|
||||||
uint64_t nSubmitTbData;
|
|
||||||
if (tDecodeU64v(&dc, &nSubmitTbData) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < nSubmitTbData; i++) {
|
|
||||||
if (tStartDecode(&dc) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t flags;
|
|
||||||
if (tDecodeI32v(&dc, &flags) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
|
||||||
// SVCreateTbReq
|
|
||||||
if (tStartDecode(&dc) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *name = NULL;
|
|
||||||
if (tDecodeCStr(&dc, &name) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
|
|
||||||
if (uid == 0) {
|
|
||||||
uid = tGenIdPI64();
|
|
||||||
}
|
|
||||||
|
|
||||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
|
||||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
|
||||||
|
|
||||||
tEndDecode(&dc);
|
|
||||||
|
|
||||||
// SSubmitTbData
|
|
||||||
int64_t suid;
|
|
||||||
if (tDecodeI64(&dc, &suid) < 0) {
|
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(&dc);
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(&dc);
|
|
||||||
tDecoderClear(&dc);
|
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_DELETE: {
|
case TDMT_VND_DELETE: {
|
||||||
int32_t size;
|
code = vnodePreProcessDeleteMsg(pVnode, pMsg);
|
||||||
int32_t ret;
|
|
||||||
uint8_t *pCont;
|
|
||||||
SEncoder *pCoder = &(SEncoder){0};
|
|
||||||
SDeleteRes res = {0};
|
|
||||||
SReadHandle handle = {
|
|
||||||
.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
|
||||||
|
|
||||||
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// malloc and encode
|
|
||||||
tEncodeSize(tEncodeDeleteRes, &res, size, ret);
|
|
||||||
pCont = rpcMallocCont(size + sizeof(SMsgHead));
|
|
||||||
|
|
||||||
((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
|
|
||||||
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
|
||||||
|
|
||||||
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
|
|
||||||
tEncodeDeleteRes(pCoder, &res);
|
|
||||||
tEncoderClear(pCoder);
|
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = pCont;
|
|
||||||
pMsg->contLen = size + sizeof(SMsgHead);
|
|
||||||
|
|
||||||
taosArrayDestroy(res.uidList);
|
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
_exit:
|
||||||
|
if (code) {
|
||||||
_err:
|
vError("vgId%d failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
|
||||||
vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
|
pMsg->msgType);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -875,7 +975,6 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
#if 1
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
|
@ -896,12 +995,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
}
|
}
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
|
|
||||||
// check
|
|
||||||
code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
|
|
||||||
if (code) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
|
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
|
||||||
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
|
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
|
||||||
|
|
||||||
|
@ -1046,145 +1139,6 @@ _exit:
|
||||||
if (code) terrno = code;
|
if (code) terrno = code;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
#else
|
|
||||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
|
||||||
SSubmitRsp submitRsp = {0};
|
|
||||||
int32_t nRows = 0;
|
|
||||||
int32_t tsize, ret;
|
|
||||||
SEncoder encoder = {0};
|
|
||||||
SArray *newTbUids = NULL;
|
|
||||||
SVStatis statis = {0};
|
|
||||||
bool tbCreated = false;
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
pRsp->code = 0;
|
|
||||||
pSubmitReq->version = version;
|
|
||||||
statis.nBatchInsert = 1;
|
|
||||||
|
|
||||||
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
|
|
||||||
pRsp->code = terrno;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
|
|
||||||
newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
|
|
||||||
if (!submitRsp.pArray || !newTbUids) {
|
|
||||||
pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
|
||||||
if (pBlock == NULL) break;
|
|
||||||
|
|
||||||
SSubmitBlkRsp submitBlkRsp = {0};
|
|
||||||
tbCreated = false;
|
|
||||||
|
|
||||||
// create table for auto create table mode
|
|
||||||
if (msgIter.schemaLen > 0) {
|
|
||||||
// tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
|
|
||||||
// if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
|
|
||||||
// pRsp->code = TSDB_CODE_INVALID_MSG;
|
|
||||||
// tDecoderClear(&decoder);
|
|
||||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
|
||||||
// goto _exit;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
|
|
||||||
// pRsp->code = terrno;
|
|
||||||
// tDecoderClear(&decoder);
|
|
||||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
|
||||||
// goto _exit;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
|
|
||||||
// pRsp->code = terrno;
|
|
||||||
// tDecoderClear(&decoder);
|
|
||||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
|
||||||
// goto _exit;
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) {
|
|
||||||
// if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
|
||||||
// submitBlkRsp.code = terrno;
|
|
||||||
// pRsp->code = terrno;
|
|
||||||
// tDecoderClear(&decoder);
|
|
||||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
|
||||||
// goto _exit;
|
|
||||||
// }
|
|
||||||
} else {
|
|
||||||
if (NULL != submitBlkRsp.pMeta) {
|
|
||||||
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
// taosArrayPush(newTbUids, &createTbReq.uid);
|
|
||||||
|
|
||||||
submitBlkRsp.uid = createTbReq.uid;
|
|
||||||
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
|
||||||
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
|
|
||||||
tbCreated = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// msgIter.uid = createTbReq.uid;
|
|
||||||
// if (createTbReq.type == TSDB_CHILD_TABLE) {
|
|
||||||
// msgIter.suid = createTbReq.ctb.suid;
|
|
||||||
// } else {
|
|
||||||
// msgIter.suid = 0;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// tDecoderClear(&decoder);
|
|
||||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
|
|
||||||
submitBlkRsp.code = terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
submitRsp.numOfRows += submitBlkRsp.numOfRows;
|
|
||||||
submitRsp.affectedRows += submitBlkRsp.affectedRows;
|
|
||||||
if (tbCreated || submitBlkRsp.code) {
|
|
||||||
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if (taosArrayGetSize(newTbUids) > 0) {
|
|
||||||
// vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
|
||||||
// (int32_t)taosArrayGetSize(newTbUids));
|
|
||||||
// }
|
|
||||||
|
|
||||||
// tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
taosArrayDestroy(newTbUids);
|
|
||||||
// tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
|
|
||||||
// pRsp->pCont = rpcMallocCont(tsize);
|
|
||||||
// pRsp->contLen = tsize;
|
|
||||||
// tEncoderInit(&encoder, pRsp->pCont, tsize);
|
|
||||||
// tEncodeSSubmitRsp(&encoder, &submitRsp);
|
|
||||||
// tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
|
|
||||||
|
|
||||||
// TODO: the partial success scenario and the error case
|
|
||||||
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
|
|
||||||
// 1/level 2.
|
|
||||||
// TODO: refactor
|
|
||||||
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
|
|
||||||
statis.nBatchInsertSuccess = 1;
|
|
||||||
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
|
||||||
}
|
|
||||||
|
|
||||||
// N.B. not strict as the following procedure is not atomic
|
|
||||||
atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows);
|
|
||||||
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows);
|
|
||||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert);
|
|
||||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess);
|
|
||||||
|
|
||||||
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
|
|
||||||
return 0;
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
|
|
Loading…
Reference in New Issue