diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 29854b4441..19f0c4a964 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -16,38 +16,34 @@ #include "sma.h" #include "tsdb.h" -static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration); +static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration, + int32_t *days); static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type); static int32_t rsmaRestore(SSma *pSma); -#define SMA_SET_KEEP_CFG(v, l) \ - do { \ - SRetention *r = &pCfg->retentions[l]; \ - int64_t keep = -1; \ - convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE, &keep); \ - pKeepCfg->keep2 = (int32_t)keep; \ - pKeepCfg->keep0 = pKeepCfg->keep2; \ - pKeepCfg->keep1 = pKeepCfg->keep2; \ - pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \ - pKeepCfg->keepTimeOffset = 0; \ +#define SMA_SET_KEEP_CFG(v, l) \ + do { \ + SRetention *r = &pCfg->retentions[l]; \ + int64_t keep = -1; \ + TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE, &keep)); \ + pKeepCfg->keep2 = (int32_t)keep; \ + pKeepCfg->keep0 = pKeepCfg->keep2; \ + pKeepCfg->keep1 = pKeepCfg->keep2; \ + TAOS_CHECK_EXIT(smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days, &pKeepCfg->days)); \ + pKeepCfg->keepTimeOffset = 0; \ } while (0) -#define SMA_OPEN_RSMA_IMPL(v, l, force) \ - do { \ - SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ - if (!RETENTION_VALID(l, r)) { \ - if (l == 0) { \ - code = TSDB_CODE_INVALID_PARA; \ - TSDB_CHECK_CODE(code, lino, _exit); \ - } \ - break; \ - } \ - code = smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ - TSDB_CHECK_CODE(code, lino, _exit); \ - if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback, force) < 0) { \ - code = terrno; \ - TSDB_CHECK_CODE(code, lino, _exit); \ - } \ +#define SMA_OPEN_RSMA_IMPL(v, l, force) \ + do { \ + SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ + if (!RETENTION_VALID(l, r)) { \ + if (l == 0) { \ + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA); \ + } \ + break; \ + } \ + TAOS_CHECK_EXIT(smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l)); \ + TAOS_CHECK_EXIT(tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback, force)); \ } while (0) /** @@ -59,51 +55,61 @@ static int32_t rsmaRestore(SSma *pSma); * @param level * @param precision * @param duration + * @param days * @return int32_t */ -static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) { - int32_t code = TSDB_CODE_SUCCESS; +static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration, + int32_t *days) { + int32_t code = 0; + int32_t lino = 0; int64_t freqDuration = -1; int64_t keepDuration = -1; - code = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE, &freqDuration); - code = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE, &keepDuration); - int32_t days = duration; // min + TAOS_CHECK_EXIT( + convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE, &freqDuration)); + TAOS_CHECK_EXIT( + convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE, &keepDuration)); + *days = duration; // min - if (days < freqDuration) { - days = freqDuration; + if (*days < freqDuration) { + *days = freqDuration; } - if (days > keepDuration) { - days = keepDuration; + if (*days > keepDuration) { + *days = keepDuration; } if (level < TSDB_RETENTION_L1 || level > TSDB_RETENTION_L2) { goto _exit; } - code = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE, &freqDuration); - code = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE, &keepDuration); + TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE, &freqDuration)); + TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE, &keepDuration)); int32_t nFreqTimes = (r + level)->freq / (60 * 1000); // use 60s for freq of 1st level - days *= (nFreqTimes > 1 ? nFreqTimes : 1); + *days *= (nFreqTimes > 1 ? nFreqTimes : 1); - if (days < freqDuration) { - days = freqDuration; + if (*days < freqDuration) { + *days = freqDuration; } int32_t maxKeepDuration = TMIN(keepDuration, TSDB_MAX_DURATION_PER_FILE); - if (days > maxKeepDuration) { - days = maxKeepDuration; + if (*days > maxKeepDuration) { + *days = maxKeepDuration; } _exit: - smaInfo("vgId:%d, evaluated duration for level %d is %d, raw val:%d", TD_VID(pVnode), level + 1, days, duration); - return days; + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + smaInfo("vgId:%d, evaluated duration for level %d is %d, raw val:%d", TD_VID(pVnode), level + 1, *days, duration); + } + TAOS_RETURN(code); } int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) { - terrno = 0; + int32_t code = 0; + int32_t lino = 0; pKeepCfg->precision = pCfg->precision; switch (type) { case TSDB_TYPE_RSMA_L0: @@ -116,10 +122,14 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty SMA_SET_KEEP_CFG(pVnode, 2); break; default: - terrno = TSDB_CODE_APP_ERROR; + code = TSDB_CODE_APP_ERROR; break; } - return terrno; +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + TAOS_RETURN(code); } int32_t smaOpen(SVnode *pVnode, int8_t rollback, bool force) { @@ -129,8 +139,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback, bool force) { SSma *pSma = taosMemoryCalloc(1, sizeof(SSma)); if (!pSma) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } pVnode->pSma = pSma; @@ -152,18 +161,14 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback, bool force) { } // restore the rsma - if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + TAOS_CHECK_EXIT(tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback)); } _exit: if (code) { smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); - terrno = code; } - return code; + TAOS_RETURN(code); } int32_t smaClose(SSma *pSma) { @@ -190,7 +195,7 @@ int32_t smaClose(SSma *pSma) { */ int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) { if (!VND_IS_RSMA(pSma->pVnode)) { - return TSDB_CODE_RSMA_INVALID_ENV; + TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV); } return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 823f65a9fd..ad7eb24f28 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -123,8 +123,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); if (*pStore == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } return TSDB_CODE_SUCCESS; @@ -132,12 +131,13 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) { SRSmaInfo *pRSmaInfo = NULL; + int32_t code = 0; if (!suid || !tbUids) { - terrno = TSDB_CODE_INVALID_PTR; + code = TSDB_CODE_INVALID_PTR; smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), suid ? *suid : -1, - terrstr()); - return TSDB_CODE_FAILED; + tstrerror(code)); + TAOS_RETURN(code); } int32_t nTables = taosArrayGetSize(tbUids); @@ -151,17 +151,17 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, if (!pRSmaInfo) { smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); - terrno = TSDB_CODE_RSMA_INVALID_STAT; - return TSDB_CODE_FAILED; + code = TSDB_CODE_RSMA_INVALID_STAT; + TAOS_RETURN(code); } for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i]) { - if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) { + if ((code = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, - terrstr()); - return TSDB_CODE_FAILED; + tstrerror(code)); + TAOS_RETURN(code); } smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64 "nTables:%d level %d", @@ -170,26 +170,25 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, } tdReleaseRSmaInfo(pSma, pRSmaInfo); - return TSDB_CODE_SUCCESS; + TAOS_RETURN(code); } int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) { + int32_t code = 0; if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { return TSDB_CODE_SUCCESS; } - if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } + TAOS_CHECK_RETURN(tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd)); void *pIter = NULL; while ((pIter = taosHashIterate(pStore->uidHash, pIter))) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); SArray *pTbUids = *(SArray **)pIter; - if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd) != TSDB_CODE_SUCCESS) { + if ((code = tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd)) != TSDB_CODE_SUCCESS) { taosHashCancelIterate(pStore->uidHash, pIter); - return TSDB_CODE_FAILED; + TAOS_RETURN(code); } } return TSDB_CODE_SUCCESS; @@ -206,6 +205,7 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) { */ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + int32_t code = 0; // only applicable to rollup SMA ctables if (!pEnv) { diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 201e496140..da99a2c9a2 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -32,19 +32,19 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code)); } - return code; + TAOS_RETURN(code); } int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) { int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg); - return code; + TAOS_RETURN(code); } int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days); - return code; + TAOS_RETURN(code); } /** @@ -70,8 +70,8 @@ static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t c STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg; int64_t sInterval = -1; - code = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND, &sInterval); - if (TSDB_CODE_SUCCESS != code || 0 == sInterval) { + TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND, &sInterval)); + if (0 == sInterval) { *days = pTsdbCfg->days; goto _exit; } @@ -80,10 +80,7 @@ static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t c *days = pTsdbCfg->days; } else { int64_t mInterval = -1; - code = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE, &mInterval); - if (TSDB_CODE_SUCCESS != code) { - goto _exit; - } + TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE, &mInterval)); int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2; if (daysPerFile > SMA_STORAGE_MINUTES_MAX) { @@ -103,7 +100,7 @@ _exit: smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days); } tDecoderClear(&coder); - return code; + TAOS_RETURN(code); } /** @@ -123,10 +120,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { // create tsma meta in dstVgId - if (metaCreateTSma(SMA_META(pSma), ver, pCfg) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg)); // create stable to save tsma result in dstVgId tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -135,28 +129,24 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg pReq.schemaRow = pCfg->schemaRow; pReq.schemaTag = pCfg->schemaTag; - if (metaCreateSTable(SMA_META(pSma), ver, &pReq) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + TAOS_CHECK_EXIT(metaCreateSTable(SMA_META(pSma), ver, &pReq)); } else { - code = terrno = TSDB_CODE_TSMA_INVALID_STAT; - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT); } _exit: if (code) { smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 - " dstTb:%s dstVg:%d", + " dstTb:%s dstVg:%d since %s", SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, - pCfg->dstVgId); + pCfg->dstVgId, tstrerror(code)); } else { smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId); } - return code; + TAOS_RETURN(code); } int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, int64_t suid, @@ -167,6 +157,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t len = 0; SSubmitReq2 *pReq = NULL; SArray *tagArray = NULL; + SHashObj *pTableIndexMap = NULL; int32_t numOfBlocks = taosArrayGetSize(pBlocks); @@ -174,18 +165,18 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); if (!tagArray || !pReq) { - code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); if (pReq->aSubmitTbData == NULL) { - code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } - SHashObj *pTableIndexMap = - taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (pTableIndexMap == NULL) { + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); + } // SSubmitTbData req for (int32_t i = 0; i < numOfBlocks; ++i) { @@ -193,8 +184,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_EXIT(tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true)); continue; } @@ -202,11 +192,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; - code = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq); - if (code) { - smaError("failed to build create-table req, code:%d", code); - continue; - } + TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq)); { uint64_t groupId = pDataBlock->info.id.groupId; @@ -221,7 +207,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * taosArrayPush(pReq->aSubmitTbData, &tbData); int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1; - taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); + TAOS_CHECK_EXIT(taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size))); } else { code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, ""); if (code != TSDB_CODE_SUCCESS) { @@ -237,15 +223,13 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * } } - taosHashCleanup(pTableIndexMap); - // encode tEncodeSize(tEncodeSubmitReq, pReq, len, code); if (TSDB_CODE_SUCCESS == code) { SEncoder encoder; len += sizeof(SSubmitReq2Msg); if (!(pBuf = rpcMallocCont(len))) { - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } @@ -253,9 +237,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len); ((SSubmitReq2Msg *)pBuf)->version = htobe64(1); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); - if (tEncodeSubmitReq(&encoder, pReq) < 0) { + if ((code = tEncodeSubmitReq(&encoder, pReq)) < 0) { tEncoderClear(&encoder); - code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } tEncoderClear(&encoder); @@ -263,6 +246,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * _exit: taosArrayDestroy(tagArray); + taosHashCleanup(pTableIndexMap); if (pReq != NULL) { tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); @@ -276,7 +260,7 @@ _exit: if (ppData) *ppData = pBuf; if (pLen) *pLen = len; } - return code; + TAOS_RETURN(code); } static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) { @@ -290,7 +274,7 @@ static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq * void *pBuf = rpcMallocCont(len + sizeof(SMsgHead)); if (!pBuf) { - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } @@ -313,7 +297,7 @@ _exit: indexUid, tstrerror(code)); } - return code; + TAOS_RETURN(code); } /** @@ -350,16 +334,15 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char pTsmaStat = SMA_STAT_TSMA(pStat); if (!pTsmaStat->pTSma) { - terrno = 0; STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); if (!pTSma) { - code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR; + code = TSDB_CODE_TSMA_INVALID_PTR; TSDB_CHECK_CODE(code, lino, _exit); } pTsmaStat->pTSma = pTSma; pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1, 1); if (!pTsmaStat->pTSchema) { - code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR; + code = TSDB_CODE_TSMA_INVALID_PTR; TSDB_CHECK_CODE(code, lino, _exit); } } @@ -378,15 +361,13 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen); TSDB_CHECK_CODE(code, lino, _exit); - if ((terrno = tsmaProcessDelReq(pSma, indexUid, &deleteReq)) != 0) { - goto _exit; - } + TAOS_CHECK_EXIT(tsmaProcessDelReq(pSma, indexUid, &deleteReq)); #if 0 if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) { - terrno = TSDB_CODE_APP_ERROR; + code = TSDB_CODE_APP_ERROR; smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid, - pTsmaStat->pTSma->indexUid, tstrerror(terrno), pTsmaStat->pTSma->dstTbName); + pTsmaStat->pTSma->indexUid, tstrerror(code), pTsmaStat->pTSma->dstTbName); goto _err; } #endif @@ -405,5 +386,5 @@ _exit: smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino, tstrerror(code), indexUid); } - return code; + TAOS_RETURN(code); }