From 528f10c3fed21b4ce63a0bce5a8f09a8cba6bc11 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 23 May 2022 08:38:05 +0000 Subject: [PATCH] feat: vnode pre-process msg --- include/common/tmsg.h | 4 +- include/util/tencode.h | 6 +- source/common/src/tmsg.c | 4 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 + source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/meta/metaTable.c | 3 - source/dnode/vnode/src/tsdb/tsdbWrite.c | 3 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 70 ++++++++++++++++----- 9 files changed, 65 insertions(+), 31 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f27dea27ad..acf2587d9d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -244,12 +244,12 @@ typedef struct { const void* pMsg; } SSubmitMsgIter; -int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); +int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); // for debug -int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); +int32_t tPrintFixedSchemaSubmitReq(SSubmitReq* pReq, STSchema* pSchema); typedef struct { int32_t code; diff --git a/include/util/tencode.h b/include/util/tencode.h index 90d7247ea9..af38d694e2 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -317,7 +317,7 @@ static FORCE_INLINE int32_t tDecodeI16v(SDecoder* pCoder, int16_t* val) { if (tDecodeU16v(pCoder, &tval) < 0) { return -1; } - *val = ZIGZAGD(int16_t, tval); + if (val) *val = ZIGZAGD(int16_t, tval); return 0; } @@ -331,7 +331,7 @@ static FORCE_INLINE int32_t tDecodeI32v(SDecoder* pCoder, int32_t* val) { if (tDecodeU32v(pCoder, &tval) < 0) { return -1; } - *val = ZIGZAGD(int32_t, tval); + if (val) *val = ZIGZAGD(int32_t, tval); return 0; } @@ -345,7 +345,7 @@ static FORCE_INLINE int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val) { if (tDecodeU64v(pCoder, &tval) < 0) { return -1; } - *val = ZIGZAGD(int64_t, tval); + if (val) *val = ZIGZAGD(int64_t, tval); return 0; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cc333ae5c8..b381299d27 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -28,7 +28,7 @@ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" -int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { +int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return -1; @@ -102,7 +102,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { } } -int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq *pReq, STSchema *pTschema) { +int32_t tPrintFixedSchemaSubmitReq(SSubmitReq *pReq, STSchema *pTschema) { SSubmitMsgIter msgIter = {0}; if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1; while (true) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e7bfbdae58..6183794bdd 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -113,6 +113,8 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); SRpcMsg rsp = {.info = pMsg->info}; + vnodePreprocessReq(pVnode->pImpl, pMsg); + int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false); if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e34ce1150d..9e33973c05 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -51,7 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); -int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); +int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 23825e6f4a..1c3819c3d3 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep int tsdbClose(STsdb** pTsdb); int tsdbBegin(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb); -int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg); +int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 4ec54d695f..8afd20e0bc 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -223,9 +223,6 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; metaReaderClear(&mr); return -1; - } else { - pReq->uid = tGenIdPI64(); - pReq->ctime = taosGetTimestampMs(); } metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index a67f413ba7..aab4da26a3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -85,7 +85,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro return 0; } -int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) { +int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ASSERT(pMsg != NULL); // STsdbMeta * pMeta = pTsdb->tsdbMeta; SSubmitMsgIter msgIter = {0}; @@ -150,7 +150,6 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) { return -1; } } - } if (terrno != TSDB_CODE_SUCCESS) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 88cc97ddd5..390073025b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -24,26 +24,62 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); -int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { -#if 0 - SRpcMsg *pMsg; - SRpcMsg *pRpc; +int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { + SDecoder dc = {0}; - *version = pVnode->state.processed; - for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { - pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); - pRpc = pMsg; + switch (pMsg->msgType) { + case TDMT_VND_CREATE_TABLE: { + int64_t ctime = taosGetTimestampMs(); + int32_t nReqs; - // set request version - if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { - vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr()); - return -1; - } + tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + tStartDecode(&dc); + + tDecodeI32v(&dc, &nReqs); + for (int32_t iReq = 0; iReq < nReqs; iReq++) { + tb_uid_t uid = tGenIdPI64(); + tStartDecode(&dc); + + tDecodeI32v(&dc, NULL); + *(int64_t *)(dc.data + dc.pos) = uid; + *(int64_t *)(dc.data + dc.pos + 8) = ctime; + + tEndDecode(&dc); + } + + tEndDecode(&dc); + tDecoderClear(&dc); + } break; + case TDMT_VND_SUBMIT: { + SSubmitMsgIter msgIter = {0}; + SSubmitReq *pSubmitReq = (SSubmitReq *)pMsg->pCont; + SSubmitBlk *pBlock = NULL; + int64_t ctime = taosGetTimestampMs(); + + tInitSubmitMsgIter(pSubmitReq, &msgIter); + + for (;;) { + tGetSubmitMsgNext(&msgIter, &pBlock); + if (pBlock == NULL) break; + + if (msgIter.schemaLen > 0) { + tDecoderInit(&dc, pBlock->data, msgIter.schemaLen); + tStartDecode(&dc); + + tDecodeI32v(&dc, NULL); + *(int64_t *)(dc.data + dc.pos) = tGenIdPI64(); + *(int64_t *)(dc.data + dc.pos + 8) = ctime; + + tEndDecode(&dc); + tDecoderClear(&dc); + } + } + + } break; + default: + break; } - walFsync(pVnode->pWal, false); - -#endif return 0; } @@ -675,7 +711,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in goto _exit; } - for (int i = 0;;) { + for (;;) { tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break;