feat: vnode pre-process msg
This commit is contained in:
parent
d784da0da0
commit
528f10c3fe
|
@ -244,12 +244,12 @@ typedef struct {
|
||||||
const void* pMsg;
|
const void* pMsg;
|
||||||
} SSubmitMsgIter;
|
} SSubmitMsgIter;
|
||||||
|
|
||||||
int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
||||||
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
||||||
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
||||||
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
||||||
// for debug
|
// for debug
|
||||||
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
|
int32_t tPrintFixedSchemaSubmitReq(SSubmitReq* pReq, STSchema* pSchema);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
|
@ -317,7 +317,7 @@ static FORCE_INLINE int32_t tDecodeI16v(SDecoder* pCoder, int16_t* val) {
|
||||||
if (tDecodeU16v(pCoder, &tval) < 0) {
|
if (tDecodeU16v(pCoder, &tval) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*val = ZIGZAGD(int16_t, tval);
|
if (val) *val = ZIGZAGD(int16_t, tval);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,7 +331,7 @@ static FORCE_INLINE int32_t tDecodeI32v(SDecoder* pCoder, int32_t* val) {
|
||||||
if (tDecodeU32v(pCoder, &tval) < 0) {
|
if (tDecodeU32v(pCoder, &tval) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*val = ZIGZAGD(int32_t, tval);
|
if (val) *val = ZIGZAGD(int32_t, tval);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +345,7 @@ static FORCE_INLINE int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val) {
|
||||||
if (tDecodeU64v(pCoder, &tval) < 0) {
|
if (tDecodeU64v(pCoder, &tval) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*val = ZIGZAGD(int64_t, tval);
|
if (val) *val = ZIGZAGD(int64_t, tval);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
#undef TD_MSG_SEG_CODE_
|
#undef TD_MSG_SEG_CODE_
|
||||||
#include "tmsgdef.h"
|
#include "tmsgdef.h"
|
||||||
|
|
||||||
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -102,7 +102,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq *pReq, STSchema *pTschema) {
|
int32_t tPrintFixedSchemaSubmitReq(SSubmitReq *pReq, STSchema *pTschema) {
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
|
@ -113,6 +113,8 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
|
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
|
||||||
SRpcMsg rsp = {.info = pMsg->info};
|
SRpcMsg rsp = {.info = pMsg->info};
|
||||||
|
|
||||||
|
vnodePreprocessReq(pVnode->pImpl, pMsg);
|
||||||
|
|
||||||
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
|
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
|
||||||
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
||||||
dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId);
|
dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId);
|
||||||
|
|
|
@ -51,7 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
|
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||||
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
|
|
|
@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
|
||||||
int tsdbClose(STsdb** pTsdb);
|
int tsdbClose(STsdb** pTsdb);
|
||||||
int tsdbBegin(STsdb* pTsdb);
|
int tsdbBegin(STsdb* pTsdb);
|
||||||
int tsdbCommit(STsdb* pTsdb);
|
int tsdbCommit(STsdb* pTsdb);
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg);
|
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||||
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
|
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
|
||||||
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||||
|
|
|
@ -223,9 +223,6 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
||||||
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
|
||||||
pReq->uid = tGenIdPI64();
|
|
||||||
pReq->ctime = taosGetTimestampMs();
|
|
||||||
}
|
}
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) {
|
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||||
ASSERT(pMsg != NULL);
|
ASSERT(pMsg != NULL);
|
||||||
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
|
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
@ -150,7 +150,6 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||||
|
|
|
@ -24,26 +24,62 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
|
||||||
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
|
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
|
||||||
|
|
||||||
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
|
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
#if 0
|
SDecoder dc = {0};
|
||||||
SRpcMsg *pMsg;
|
|
||||||
SRpcMsg *pRpc;
|
|
||||||
|
|
||||||
*version = pVnode->state.processed;
|
switch (pMsg->msgType) {
|
||||||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
case TDMT_VND_CREATE_TABLE: {
|
||||||
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
int64_t ctime = taosGetTimestampMs();
|
||||||
pRpc = pMsg;
|
int32_t nReqs;
|
||||||
|
|
||||||
// set request version
|
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||||
if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
|
tStartDecode(&dc);
|
||||||
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
|
|
||||||
return -1;
|
tDecodeI32v(&dc, &nReqs);
|
||||||
|
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
||||||
|
tb_uid_t uid = tGenIdPI64();
|
||||||
|
tStartDecode(&dc);
|
||||||
|
|
||||||
|
tDecodeI32v(&dc, NULL);
|
||||||
|
*(int64_t *)(dc.data + dc.pos) = uid;
|
||||||
|
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
||||||
|
|
||||||
|
tEndDecode(&dc);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(&dc);
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
} break;
|
||||||
|
case TDMT_VND_SUBMIT: {
|
||||||
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
SSubmitReq *pSubmitReq = (SSubmitReq *)pMsg->pCont;
|
||||||
|
SSubmitBlk *pBlock = NULL;
|
||||||
|
int64_t ctime = taosGetTimestampMs();
|
||||||
|
|
||||||
|
tInitSubmitMsgIter(pSubmitReq, &msgIter);
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||||
|
if (pBlock == NULL) break;
|
||||||
|
|
||||||
|
if (msgIter.schemaLen > 0) {
|
||||||
|
tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
|
||||||
|
tStartDecode(&dc);
|
||||||
|
|
||||||
|
tDecodeI32v(&dc, NULL);
|
||||||
|
*(int64_t *)(dc.data + dc.pos) = tGenIdPI64();
|
||||||
|
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
||||||
|
|
||||||
|
tEndDecode(&dc);
|
||||||
|
tDecoderClear(&dc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
walFsync(pVnode->pWal, false);
|
} break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -675,7 +711,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0;;) {
|
for (;;) {
|
||||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||||
if (pBlock == NULL) break;
|
if (pBlock == NULL) break;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue