mroe code
This commit is contained in:
parent
1711275400
commit
f982428e46
|
@ -30,158 +30,186 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
|
|||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int64_t ctime = taosGetTimestampMs();
|
||||
SDecoder dc = {0};
|
||||
int32_t nReqs;
|
||||
|
||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (tDecodeI32v(&dc, &nReqs) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
||||
tb_uid_t uid = tGenIdPI64();
|
||||
char *name = NULL;
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
if (tDecodeCStr(&dc, &name) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
||||
|
||||
vTrace("vgId:%d table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
|
||||
tEndDecode(&dc);
|
||||
}
|
||||
|
||||
tEndDecode(&dc);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&dc);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int64_t ctime = taosGetTimestampMs();
|
||||
SDecoder dc = {0};
|
||||
|
||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||
tStartDecode(&dc);
|
||||
|
||||
uint64_t nSubmitTbData;
|
||||
if (tDecodeU64v(&dc, &nSubmitTbData) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < nSubmitTbData; i++) {
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int32_t flags;
|
||||
if (tDecodeI32v(&dc, &flags) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||
// SVCreateTbReq
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
char *name = NULL;
|
||||
if (tDecodeCStr(&dc, &name) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
|
||||
if (uid == 0) {
|
||||
uid = tGenIdPI64();
|
||||
}
|
||||
|
||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
||||
|
||||
tEndDecode(&dc);
|
||||
|
||||
// SSubmitTbData
|
||||
int64_t suid;
|
||||
if (tDecodeI64(&dc, &suid) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
||||
}
|
||||
|
||||
tEndDecode(&dc);
|
||||
}
|
||||
|
||||
tEndDecode(&dc);
|
||||
tDecoderClear(&dc);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t size;
|
||||
int32_t ret;
|
||||
uint8_t *pCont;
|
||||
SEncoder *pCoder = &(SEncoder){0};
|
||||
SDeleteRes res = {0};
|
||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||
|
||||
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
||||
if (code) goto _exit;
|
||||
|
||||
// malloc and encode
|
||||
tEncodeSize(tEncodeDeleteRes, &res, size, ret);
|
||||
pCont = rpcMallocCont(size + sizeof(SMsgHead));
|
||||
|
||||
((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
|
||||
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
||||
|
||||
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
|
||||
tEncodeDeleteRes(pCoder, &res);
|
||||
tEncoderClear(pCoder);
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = pCont;
|
||||
pMsg->contLen = size + sizeof(SMsgHead);
|
||||
|
||||
taosArrayDestroy(res.uidList);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_VND_CREATE_TABLE: {
|
||||
int64_t ctime = taosGetTimestampMs();
|
||||
int32_t nReqs;
|
||||
|
||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (tDecodeI32v(&dc, &nReqs) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
||||
tb_uid_t uid = tGenIdPI64();
|
||||
char *name = NULL;
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
if (tDecodeCStr(&dc, &name) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
||||
|
||||
vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
|
||||
tEndDecode(&dc);
|
||||
}
|
||||
|
||||
tEndDecode(&dc);
|
||||
tDecoderClear(&dc);
|
||||
code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
|
||||
} break;
|
||||
case TDMT_VND_SUBMIT: {
|
||||
int64_t ctime = taosGetTimestampMs();
|
||||
|
||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||
tStartDecode(&dc);
|
||||
|
||||
uint64_t nSubmitTbData;
|
||||
if (tDecodeU64v(&dc, &nSubmitTbData) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < nSubmitTbData; i++) {
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int32_t flags;
|
||||
if (tDecodeI32v(&dc, &flags) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||
// SVCreateTbReq
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
char *name = NULL;
|
||||
if (tDecodeCStr(&dc, &name) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
|
||||
if (uid == 0) {
|
||||
uid = tGenIdPI64();
|
||||
}
|
||||
|
||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
||||
|
||||
tEndDecode(&dc);
|
||||
|
||||
// SSubmitTbData
|
||||
int64_t suid;
|
||||
if (tDecodeI64(&dc, &suid) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
||||
}
|
||||
|
||||
tEndDecode(&dc);
|
||||
}
|
||||
|
||||
tEndDecode(&dc);
|
||||
tDecoderClear(&dc);
|
||||
code = vnodePreProcessSubmitMsg(pVnode, pMsg);
|
||||
} break;
|
||||
case TDMT_VND_DELETE: {
|
||||
int32_t size;
|
||||
int32_t ret;
|
||||
uint8_t *pCont;
|
||||
SEncoder *pCoder = &(SEncoder){0};
|
||||
SDeleteRes res = {0};
|
||||
SReadHandle handle = {
|
||||
.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||
|
||||
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// malloc and encode
|
||||
tEncodeSize(tEncodeDeleteRes, &res, size, ret);
|
||||
pCont = rpcMallocCont(size + sizeof(SMsgHead));
|
||||
|
||||
((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
|
||||
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
||||
|
||||
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
|
||||
tEncodeDeleteRes(pCoder, &res);
|
||||
tEncoderClear(pCoder);
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = pCont;
|
||||
pMsg->contLen = size + sizeof(SMsgHead);
|
||||
|
||||
taosArrayDestroy(res.uidList);
|
||||
code = vnodePreProcessDeleteMsg(pVnode, pMsg);
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
|
||||
_exit:
|
||||
if (code) {
|
||||
vError("vgId%d failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
|
||||
pMsg->msgType);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -871,7 +899,6 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
|
|||
}
|
||||
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
#if 1
|
||||
int32_t code = 0;
|
||||
terrno = 0;
|
||||
|
||||
|
@ -1042,145 +1069,6 @@ _exit:
|
|||
if (code) terrno = code;
|
||||
|
||||
return code;
|
||||
|
||||
#else
|
||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||
SSubmitRsp submitRsp = {0};
|
||||
int32_t nRows = 0;
|
||||
int32_t tsize, ret;
|
||||
SEncoder encoder = {0};
|
||||
SArray *newTbUids = NULL;
|
||||
SVStatis statis = {0};
|
||||
bool tbCreated = false;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
pRsp->code = 0;
|
||||
pSubmitReq->version = version;
|
||||
statis.nBatchInsert = 1;
|
||||
|
||||
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
|
||||
pRsp->code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
|
||||
newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
|
||||
if (!submitRsp.pArray || !newTbUids) {
|
||||
pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||
if (pBlock == NULL) break;
|
||||
|
||||
SSubmitBlkRsp submitBlkRsp = {0};
|
||||
tbCreated = false;
|
||||
|
||||
// create table for auto create table mode
|
||||
if (msgIter.schemaLen > 0) {
|
||||
// tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
|
||||
// if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
|
||||
// pRsp->code = TSDB_CODE_INVALID_MSG;
|
||||
// tDecoderClear(&decoder);
|
||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
||||
// goto _exit;
|
||||
// }
|
||||
|
||||
// if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
|
||||
// pRsp->code = terrno;
|
||||
// tDecoderClear(&decoder);
|
||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
||||
// goto _exit;
|
||||
// }
|
||||
|
||||
// if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
|
||||
// pRsp->code = terrno;
|
||||
// tDecoderClear(&decoder);
|
||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
||||
// goto _exit;
|
||||
// }
|
||||
|
||||
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) {
|
||||
// if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||
// submitBlkRsp.code = terrno;
|
||||
// pRsp->code = terrno;
|
||||
// tDecoderClear(&decoder);
|
||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
||||
// goto _exit;
|
||||
// }
|
||||
} else {
|
||||
if (NULL != submitBlkRsp.pMeta) {
|
||||
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
|
||||
}
|
||||
|
||||
// taosArrayPush(newTbUids, &createTbReq.uid);
|
||||
|
||||
submitBlkRsp.uid = createTbReq.uid;
|
||||
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
||||
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
|
||||
tbCreated = true;
|
||||
}
|
||||
|
||||
// msgIter.uid = createTbReq.uid;
|
||||
// if (createTbReq.type == TSDB_CHILD_TABLE) {
|
||||
// msgIter.suid = createTbReq.ctb.suid;
|
||||
// } else {
|
||||
// msgIter.suid = 0;
|
||||
// }
|
||||
|
||||
// tDecoderClear(&decoder);
|
||||
// taosArrayDestroy(createTbReq.ctb.tagName);
|
||||
}
|
||||
|
||||
if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
|
||||
submitBlkRsp.code = terrno;
|
||||
}
|
||||
|
||||
submitRsp.numOfRows += submitBlkRsp.numOfRows;
|
||||
submitRsp.affectedRows += submitBlkRsp.affectedRows;
|
||||
if (tbCreated || submitBlkRsp.code) {
|
||||
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
|
||||
}
|
||||
}
|
||||
|
||||
// if (taosArrayGetSize(newTbUids) > 0) {
|
||||
// vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
||||
// (int32_t)taosArrayGetSize(newTbUids));
|
||||
// }
|
||||
|
||||
// tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
||||
|
||||
_exit:
|
||||
taosArrayDestroy(newTbUids);
|
||||
// tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
|
||||
// pRsp->pCont = rpcMallocCont(tsize);
|
||||
// pRsp->contLen = tsize;
|
||||
// tEncoderInit(&encoder, pRsp->pCont, tsize);
|
||||
// tEncodeSSubmitRsp(&encoder, &submitRsp);
|
||||
// tEncoderClear(&encoder);
|
||||
|
||||
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
|
||||
|
||||
// TODO: the partial success scenario and the error case
|
||||
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
|
||||
// 1/level 2.
|
||||
// TODO: refactor
|
||||
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
|
||||
statis.nBatchInsertSuccess = 1;
|
||||
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
||||
}
|
||||
|
||||
// N.B. not strict as the following procedure is not atomic
|
||||
atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows);
|
||||
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows);
|
||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert);
|
||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess);
|
||||
|
||||
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
|
||||
return 0;
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
|
|
Loading…
Reference in New Issue