From 15b25eeec5f1211e53faf3d9393d303e2abbcde0 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 1 Jun 2022 15:06:12 +0800 Subject: [PATCH] feat: tsma/rsma refactor --- include/common/tmsg.h | 1 + include/util/taoserror.h | 1 + source/common/src/tmsg.c | 2 ++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 23 +++++++++++-- source/dnode/vnode/inc/vnode.h | 2 ++ source/dnode/vnode/src/inc/vnodeInt.h | 7 ++-- source/dnode/vnode/src/sma/smaOpen.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/dnode/vnode/src/vnd/vnodeCfg.c | 3 ++ source/dnode/vnode/src/vnd/vnodeCommit.c | 3 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 38 ++++++++++++++------- source/util/src/terror.c | 1 + 13 files changed, 63 insertions(+), 24 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e926e2fb31..1eae1835b4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2296,6 +2296,7 @@ typedef struct { int8_t intervalUnit; // MACRO: TIME_UNIT_XXX int8_t slidingUnit; // MACRO: TIME_UNIT_XXX int8_t timezoneInt; // sma data expired if timezone changes. + int32_t dstVgId; char indexName[TSDB_INDEX_NAME_LEN]; int32_t exprLen; int32_t tagsFilterLen; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 65cfe8de0b..412192e8a8 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -70,6 +70,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028) #define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029) #define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030) +#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 233623c616..2fc60320dd 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3654,6 +3654,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { if (tEncodeI8(pCoder, pSma->intervalUnit) < 0) return -1; if (tEncodeI8(pCoder, pSma->slidingUnit) < 0) return -1; if (tEncodeI8(pCoder, pSma->timezoneInt) < 0) return -1; + if (tEncodeI32(pCoder, pSma->dstVgId) < 0) return -1; if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1; if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1; if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1; @@ -3676,6 +3677,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { if (tDecodeI8(pCoder, &pSma->version) < 0) return -1; if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1; if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1; + if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1; if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1; if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1; if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 27ea19a5dc..dda59b2396 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -150,8 +150,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->tsdbCfg.minRows = pCreate->minRows; pCfg->tsdbCfg.maxRows = pCreate->maxRows; for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) { - memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); + SRetention *pRetention = &pCfg->tsdbCfg.retentions[i]; + memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); + if (i == 0) { + if ((pRetention->freq > 0 && pRetention->keep > 0)) pCfg->isRsma = 1; + } } + pCfg->walCfg.vgId = pCreate->vgId; pCfg->hashBegin = pCreate->hashBegin; pCfg->hashEnd = pCreate->hashEnd; @@ -218,9 +223,20 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl); if (code != 0) { dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr()); + code = terrno; goto _OVER; } + if (createReq.isTsma) { + SMsgHead *smaMsg = createReq.pTsma; + uint32_t contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead)); + if (vnodeProcessCreateTSma(pImpl, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen) < 0) { + dError("vgId:%d, failed to create tsma since %s", createReq.vgId, terrstr()); + code = terrno; + goto _OVER; + }; + } + code = vnodeStart(pImpl); if (code != 0) { dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr()); @@ -228,7 +244,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } code = vmWriteVnodeListToFile(pMgmt); - if (code != 0) goto _OVER; + if (code != 0) { + code = terrno; + goto _OVER; + } _OVER: if (code != 0) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6be770cfa7..a274de24af 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -68,6 +68,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever); int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader); int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData); +int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); // meta typedef struct SMeta SMeta; // todo: remove @@ -172,6 +173,7 @@ struct SVnodeCfg { bool isHeap; bool isWeak; int8_t isTsma; + int8_t isRsma; int8_t hashMethod; STsdbCfg tsdbCfg; SWalCfg walCfg; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e3a0c94ccc..6c066acff2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -239,6 +239,8 @@ struct SVnode { #define VND_RSMA1(vnd) ((vnd)->pSma->pRSmaTsdb1) #define VND_RSMA2(vnd) ((vnd)->pSma->pRSmaTsdb2) #define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions) +#define VND_IS_RSMA(v) ((v)->config.isRsma == 1) +#define VND_IS_TSMA(v) ((v)->config.isTsma == 1) struct STbUidStore { tb_uid_t suid; @@ -271,11 +273,6 @@ struct SSma { #define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb1) #define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb2) -static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) { - SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]); - return (pRetention->freq > 0 && pRetention->keep > 0); -} - // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 2a74fe78cb..dde6578054 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -104,7 +104,7 @@ int32_t smaOpen(SVnode *pVnode) { taosThreadMutexInit(&pSma->mutex, NULL); pSma->locked = false; - if (vnodeIsRollup(pVnode)) { + if (VND_IS_RSMA(pVnode)) { STsdbKeepCfg keepCfg = {0}; for (int i = 0; i < TSDB_RETENTION_MAX; ++i) { if (i == TSDB_RETENTION_L0) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 62125b6dc7..f9c5fac536 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -333,7 +333,7 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData } static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions) { - if (vnodeIsRollup(pVnode)) { + if (VND_IS_RSMA(pVnode)) { int level = 0; int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 56cc3d6374..e8fa2ed3c1 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -57,6 +57,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "isRsma", pCfg->isRsma) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; @@ -133,6 +134,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if(code < 0) return -1; tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code); if(code < 0) return -1; + tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code); + if(code < 0) return -1; tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code); if(code < 0) return -1; tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index b4fbd01c63..a0db3cfe2d 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -230,7 +230,7 @@ int vnodeCommit(SVnode *pVnode) { return -1; } - if(vnodeIsRollup(pVnode)) { + if (VND_IS_RSMA(pVnode)) { if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { ASSERT(0); return -1; @@ -250,7 +250,6 @@ int vnodeCommit(SVnode *pVnode) { } } - if (tqCommit(pVnode->pTq) < 0) { ASSERT(0); return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 178ef28e5d..43b4d6c77d 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -97,7 +97,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { } // open tsdb - if (!vnodeIsRollup(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) { + if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) { vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4b237bc703..149c45ba6f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -776,8 +776,7 @@ _exit: // TODO: the partial success scenario and the error case // TODO: refactor - if ((terrno == TSDB_CODE_SUCCESS || terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) && - (pRsp->code == TSDB_CODE_SUCCESS)) { + if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); } @@ -788,16 +787,19 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq SVCreateTSmaReq req = {0}; SDecoder coder; - pRsp->msgType = TDMT_VND_CREATE_SMA_RSP; - pRsp->code = TSDB_CODE_SUCCESS; - pRsp->pCont = NULL; - pRsp->contLen = 0; + if (pRsp) { + pRsp->msgType = TDMT_VND_CREATE_SMA_RSP; + pRsp->code = TSDB_CODE_SUCCESS; + pRsp->pCont = NULL; + pRsp->contLen = 0; + } // decode and process req tDecoderInit(&coder, pReq, len); if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) { - pRsp->code = terrno; + terrno = TSDB_CODE_MSG_DECODE_ERROR; + if (pRsp) pRsp->code = terrno; goto _err; } @@ -805,18 +807,30 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq req.timezoneInt = tsTimezone; if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) { - pRsp->code = terrno; + if (pRsp) pRsp->code = terrno; goto _err; } tDecoderClear(&coder); - vDebug("vgId:%d success to create tsma %s:%" PRIi64 " for table %" PRIi64, TD_VID(pVnode), req.indexName, - req.indexUid, req.tableUid); + vDebug("vgId:%d success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode), + req.indexName, req.indexUid, version, req.tableUid); return 0; _err: tDecoderClear(&coder); - vError("vgId:%d failed to create tsma %s:%" PRIi64 " for table %" PRIi64 " since %s", TD_VID(pVnode), req.indexName, - req.indexUid, req.tableUid, terrstr(terrno)); + vError("vgId:%d failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s", + TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno)); return -1; } + +/** + * @brief specific for smaDstVnode + * + * @param pVnode + * @param pCont + * @param contLen + * @return int32_t + */ +int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) { + return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL); +} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 575ae10bbb..a57f942f74 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -75,6 +75,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") +TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")