From 7b49327463fd0373104c0bc175f47af07e0242db Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 24 Jul 2024 10:09:24 +0800 Subject: [PATCH] enh: refactor return code --- source/dnode/vnode/src/sma/smaRollup.c | 416 +++++++++++------------- source/dnode/vnode/src/tsdb/tsdbWrite.c | 4 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- 3 files changed, 191 insertions(+), 231 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ad7eb24f28..ddf7abf0a5 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -38,24 +38,24 @@ SSmaMgmt smaMgmt = { typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; -static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); -static void tdUidStoreDestory(STbUidStore *pStore); -static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); -static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, - int8_t idx); -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, - SRSmaInfo *pInfo, ERsmaExecType type, int8_t level); -static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); -static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); -static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type); -static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); -static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, - int32_t execType, int8_t *streamFlushed); -static void tdRSmaFetchTrigger(void *param, void *tmrId); -static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); -static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); -static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); -static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); +static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); +static void tdUidStoreDestory(STbUidStore *pStore); +static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); +static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, + int8_t idx); +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, + SRSmaInfo *pInfo, ERsmaExecType type, int8_t level); +static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo); +static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); +static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type); +static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, + int32_t execType, int8_t *streamFlushed); +static void tdRSmaFetchTrigger(void *param, void *tmrId); +static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); +static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); +static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); +static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); struct SRSmaQTaskInfoItem { int32_t len; @@ -147,11 +147,10 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, return TSDB_CODE_SUCCESS; } - pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, *suid); + code = tdAcquireRSmaInfoBySuid(pSma, *suid, &pRSmaInfo); - if (!pRSmaInfo) { + if (code != 0) { smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); - code = TSDB_CODE_RSMA_INVALID_STAT; TAOS_RETURN(code); } @@ -215,8 +214,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) { - terrno = TSDB_CODE_RSMA_INVALID_STAT; - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT); } // info cached when create rsma stable and return directly for non-rsma ctables @@ -225,14 +223,12 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui } if (!(*ppStore)) { - if (tdUidStoreInit(ppStore) < 0) { - return TSDB_CODE_FAILED; - } + TAOS_CHECK_RETURN(tdUidStoreInit(ppStore)); } - if (tdUidStorePut(*ppStore, suid, &uid) < 0) { + if ((code = tdUidStorePut(*ppStore, suid, &uid)) < 0) { *ppStore = tdUidStoreFree(*ppStore); - return TSDB_CODE_FAILED; + TAOS_RETURN(code); } return TSDB_CODE_SUCCESS; @@ -262,6 +258,7 @@ static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskI static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx) { + int32_t code = 0; if ((param->qmsgLen > 0) && param->qmsg[idx]) { SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); SRetention *pRetention = SMA_RETENTION(pSma); @@ -275,18 +272,20 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat if (!taosCheckExistFile(taskInfDir)) { char *s = taosStrdup(taskInfDir); + if (!s) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } if (taosMulMkDir(s) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(s); - return TSDB_CODE_FAILED; + TAOS_RETURN(code); } taosMemoryFree(s); } SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask)); if (!pStreamTask) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pItem->pStreamTask = pStreamTask; pStreamTask->id.taskId = 0; @@ -294,24 +293,20 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pStreamTask->chkInfo.startTs = taosGetTimestampMs(); pStreamTask->pMeta = pVnode->pTq->pStreamMeta; pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1); + if (!pStreamTask->exec.qmsg) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG); pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta); tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); - int32_t code = streamCreateStateMachine(pStreamTask); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + TAOS_CHECK_RETURN(streamCreateStateMachine(pStreamTask)); - code = streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + TAOS_CHECK_RETURN(streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo)); pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId); if (!pStreamState) { - terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_RSMA_STREAM_STATE_OPEN); } pItem->pStreamState = pStreamState; @@ -321,12 +316,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat initStorageAPI(&handle.api); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); if (!pRSmaInfo->taskInfo[idx]) { - terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE); } if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) { - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } if (pItem->fetchResultVer < pItem->submitReqVer) { @@ -349,7 +343,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; - taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); + if (taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)) != 0) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); @@ -359,7 +355,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId, pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); } - return TSDB_CODE_SUCCESS; + TAOS_RETURN(TSDB_CODE_SUCCESS); } /** @@ -372,20 +368,14 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat * @return int32_t */ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { - int32_t code; + int32_t code = 0; + int32_t lino = 0; if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid); return TSDB_CODE_SUCCESS; } -#if 0 - if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { - terrno = TSDB_CODE_TDB_INIT_FAILED; - return TSDB_CODE_FAILED; - } -#endif - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; @@ -399,41 +389,34 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con // from write queue: single thead pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo)); if (!pRSmaInfo) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1, 1); if (!pTSchema) { - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; - goto _err; + TAOS_CHECK_EXIT(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION); } pRSmaInfo->pSma = pSma; pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->suid = suid; T_REF_INIT_VAL(pRSmaInfo, 1); - code = taosOpenQueue(&pRSmaInfo->queue); - if (code) goto _err; + TAOS_CHECK_EXIT(taosOpenQueue(&pRSmaInfo->queue)); - code = taosAllocateQall(&pRSmaInfo->qall); - if (code) goto _err; + TAOS_CHECK_EXIT(taosAllocateQall(&pRSmaInfo->qall)); - if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0 || - tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) { - goto _err; + TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0)); + TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1)); + + TAOS_CHECK_EXIT(taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo))); + +_exit: + if (code != 0) { + tdFreeRSmaInfo(pSma, pRSmaInfo); + } else { + smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid); } - - if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != 0) { - goto _err; - } - - smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid); - - return TSDB_CODE_SUCCESS; -_err: - tdFreeRSmaInfo(pSma, pRSmaInfo); - return TSDB_CODE_FAILED; + TAOS_RETURN(code); } /** @@ -480,11 +463,13 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { return TSDB_CODE_SUCCESS; } + int32_t code = 0; SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); + SRSmaInfo *pRSmaInfo = NULL; - SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid); + code = tdAcquireRSmaInfoBySuid(pSma, pReq->suid, &pRSmaInfo); - if (!pRSmaInfo) { + if (code != 0) { smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; @@ -519,12 +504,11 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) if (uid) { if (!pStore->tbUids) { if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } } if (!taosArrayPush(pStore->tbUids, uid)) { - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } } } else { @@ -532,32 +516,29 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) if (!pStore->uidHash) { pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (!pStore->uidHash) { - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } } if (uid) { SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t)); if (uidArray && ((uidArray = *(SArray **)uidArray))) { - taosArrayPush(uidArray, uid); + if (!taosArrayPush(uidArray, uid)) { + taosArrayDestroy(uidArray); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } } else { SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t)); if (!pUidArray) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } if (!taosArrayPush(pUidArray, uid)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosArrayDestroy(pUidArray); - return TSDB_CODE_FAILED; - } - if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) { - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } + TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray))); } } else { - if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) { - return TSDB_CODE_FAILED; - } + TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0)); } } return TSDB_CODE_SUCCESS; @@ -600,9 +581,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { if (pReq) { SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq; // spin lock for race condition during insert data - if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { - return TSDB_CODE_FAILED; - } + TAOS_CHECK_RETURN(tsdbInsertData(pTsdb, version, pSubmitReq, NULL)); } return TSDB_CODE_SUCCESS; @@ -612,13 +591,9 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) { SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL; int32_t size = taosArrayGetSize(pSubmitTbData); - terrno = TSDB_CODE_SUCCESS; - for (int32_t i = 0; i < size; ++i) { SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i); - if ((terrno = tdUidStorePut(pStore, pData->suid, NULL)) < 0) { - return -1; - } + TAOS_CHECK_RETURN(tdUidStorePut(pStore, pData->suid, NULL)); } return 0; @@ -645,7 +620,7 @@ int32_t smaRetention(SSma *pSma, int64_t now) { } _end: - return code; + TAOS_RETURN(code); } static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) { @@ -659,13 +634,17 @@ static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatc void *pBuf = rpcMallocCont(len + sizeof(SMsgHead)); if (!pBuf) { - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } SEncoder encoder; tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len); - tEncodeSBatchDeleteReq(&encoder, pDelReq); + if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) { + tEncoderClear(&encoder); + rpcFreeCont(pBuf); + TSDB_CHECK_CODE(code, lino, _exit); + } tEncoderClear(&encoder); ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode); @@ -682,7 +661,7 @@ _exit: SMA_VID(pSma), lino, suid, level, tstrerror(code)); } - return code; + TAOS_RETURN(code); } static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, @@ -717,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level}; deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); if (!deleteReq.deleteReqs) { - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true); @@ -753,21 +732,15 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq2 *pReq = NULL; - if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { - code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; - TSDB_CHECK_CODE(code, lino, _exit); - } + TAOS_CHECK_EXIT( + buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid)); - if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { - if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) { + if (pReq && (code = tdProcessSubmitReq(sinkTsdb, output->info.version, pReq)) < 0) { + if (code == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) { // TODO: reconfigure SSubmitReq2 - } else { - if (terrno == 0) terrno = TSDB_CODE_RSMA_RESULT; - code = terrno; } tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); - taosMemoryFree(pReq); - pReq = NULL; + taosMemoryFreeClear(pReq); TSDB_CHECK_CODE(code, lino, _exit); } @@ -812,14 +785,12 @@ _exit: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - int32_t code; + int32_t code = 0; + int32_t lino = 0; int32_t size = RSMA_EXEC_MSG_HLEN + len; // header + payload void *qItem; - code = taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem); - if (code) { - return code; - } + TAOS_CHECK_RETURN(taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem)); void *pItem = qItem; @@ -830,7 +801,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p *(int64_t *)pItem = version; memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len); - taosWriteQitem(pInfo->queue, qItem); + TAOS_CHECK_RETURN(taosWriteQitem(pInfo->queue, qItem)); pInfo->lastRecv = taosGetTimestampMs(); @@ -863,10 +834,10 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { SSubmitMsgIter msgIter = {0}; SSubmitBlkIter blkIter = {0}; STSRow *row = NULL; - if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1; + TAOS_CHECK_RETURN(tInitSubmitMsgIter(pReq, &msgIter)); while (true) { SSubmitBlk *pBlock = NULL; - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + TAOS_CHECK_RETURN(tGetSubmitMsgNext(&msgIter, &pBlock)); if (pBlock == NULL) break; tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { @@ -892,6 +863,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { */ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) { + int32_t code = 0; int32_t idx = level - 1; void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); @@ -902,25 +874,24 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, return TSDB_CODE_SUCCESS; } if (!pInfo->pTSchema) { - terrno = TSDB_CODE_INVALID_PTR; smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); - return TSDB_CODE_FAILED; + TAOS_RETURN(TSDB_CODE_INVALID_PTR); } smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64 ", inputType:%d", SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType); - if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) { - smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); - return TSDB_CODE_FAILED; + if ((code = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) { + smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(code)); + TAOS_RETURN(TSDB_CODE_FAILED); } atomic_store_64(&pItem->submitReqVer, version); - terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL); + TAOS_CHECK_RETURN(tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL)); - return terrno ? TSDB_CODE_FAILED : TDB_CODE_SUCCESS; + TAOS_RETURN(code); } /** @@ -928,26 +899,23 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, * * @param pSma * @param suid - * @return SRSmaInfo* */ -static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { +static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo) { int32_t code = 0; int32_t lino = 0; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = NULL; SRSmaInfo *pRSmaInfo = NULL; - terrno = 0; + *ppRSmaInfo = NULL; if (!pEnv) { - terrno = TSDB_CODE_RSMA_INVALID_ENV; - return NULL; + TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV); } pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); if (!pStat || !RSMA_INFO_HASH(pStat)) { - terrno = TSDB_CODE_RSMA_INVALID_STAT; - return NULL; + TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT); } taosRLockLatch(SMA_ENV_LOCK(pEnv)); @@ -955,20 +923,19 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - return NULL; + TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT); } tdRefRSmaInfo(pSma, pRSmaInfo); taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) { - terrno = TSDB_CODE_APP_ERROR; - return NULL; + TAOS_RETURN(TSDB_CODE_APP_ERROR); } - return pRSmaInfo; + TAOS_RETURN(code); } taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - return NULL; + TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT); } static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { @@ -989,16 +956,19 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { */ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, tb_uid_t suid) { - SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); - if (!pRSmaInfo) { + int32_t code = 0; + SRSmaInfo *pRSmaInfo = NULL; + + code = tdAcquireRSmaInfoBySuid(pSma, suid, &pRSmaInfo); + if (code != 0) { smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); - return TSDB_CODE_SUCCESS; + TAOS_RETURN(TSDB_CODE_SUCCESS); // return success } if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) { - if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) { + if ((code = tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid)) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); - return TSDB_CODE_FAILED; + TAOS_RETURN(code); } if (smaMgmt.tmrHandle) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0); @@ -1011,11 +981,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, } } } else { - terrno = TSDB_CODE_APP_ERROR; + code = TSDB_CODE_APP_ERROR; tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid, - tstrerror(terrno), inputType); - return TSDB_CODE_FAILED; + tstrerror(code), inputType); + TAOS_RETURN(code); } tdReleaseRSmaInfo(pSma, pRSmaInfo); @@ -1025,57 +995,56 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) { if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS; - if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { - smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr()); - goto _err; + int32_t code = 0; + if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { + smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), tstrerror(code)); + goto _exit; } STbUidStore uidStore = {0}; - if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) { - smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr()); - goto _err; + if ((code = tdFetchSubmitReqSuids(pReq, &uidStore)) < 0) { + smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), tstrerror(code)); + goto _exit; } if (uidStore.suid != 0) { - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); - goto _err; + if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid)) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code)); + goto _exit; } void *pIter = NULL; while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); + if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid)) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), tstrerror(code)); taosHashCancelIterate(uidStore.uidHash, pIter); - goto _err; + goto _exit; } } } +_exit: tdUidStoreDestory(&uidStore); - return TSDB_CODE_SUCCESS; -_err: - tdUidStoreDestory(&uidStore); - return terrno; + TAOS_RETURN(code); } int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) { if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS; - if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { - smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), terrstr()); - goto _err; + int32_t code = 0; + if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { + smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), tstrerror(code)); + goto _exit; } SDeleteRes *pDelRes = pReq; - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); - goto _err; + if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid)) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code)); + goto _exit; } - return TSDB_CODE_SUCCESS; -_err: - return terrno; +_exit: + TAOS_RETURN(code); } /** @@ -1099,10 +1068,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { TSDB_CHECK_CODE(code, lino, _exit); } - if (vnodeGetStbIdList(pSma->pVnode, 0, suidList) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + TAOS_CHECK_EXIT(vnodeGetStbIdList(pSma->pVnode, 0, suidList)); int64_t arrSize = taosArrayGetSize(suidList); @@ -1146,10 +1112,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { " qmsgLen:%" PRIi32, TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]); } - if (tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } + TAOS_CHECK_EXIT(tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name)); #if 0 // reload all ctbUids for suid uidStore.suid = suid; @@ -1180,7 +1143,7 @@ _exit: metaReaderClear(&mr); taosArrayDestroy(suidList); tdUidStoreDestory(&uidStore); - return code; + TAOS_RETURN(code); } /** @@ -1188,20 +1151,16 @@ _exit: */ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) { int32_t code = 0; + int32_t lino = 0; int64_t nTables = 0; // step 1: init env - if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_TDB_INIT_FAILED; - goto _err; - } + TAOS_CHECK_EXIT(tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP)); // step 2: iterate all stables to restore the rsma env - if ((code = tdRSmaRestoreQTaskInfoInit(pSma, &nTables)) < 0) { - goto _err; - } + TAOS_CHECK_EXIT(tdRSmaRestoreQTaskInfoInit(pSma, &nTables)); -_err: +_exit: if (code) { smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type, qtaskFileVer, tstrerror(code)); @@ -1210,7 +1169,7 @@ _err: type, qtaskFileVer, nTables); } - return code; + TAOS_RETURN(code); } int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { @@ -1323,9 +1282,8 @@ _checkpoint: } streamMetaWLock(pMeta); - if (streamMetaSaveTask(pMeta, pTask)) { + if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { streamMetaWUnLock(pMeta); - code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY; taosHashCancelIterate(pInfoHash, infoHash); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1338,9 +1296,9 @@ _checkpoint: } if (pMeta) { streamMetaWLock(pMeta); - if (streamMetaCommit(pMeta)) { + if ((code = streamMetaCommit(pMeta)) != 0) { streamMetaWUnLock(pMeta); - code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY; + if (code == -1) code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } streamMetaWUnLock(pMeta); @@ -1354,8 +1312,7 @@ _exit: smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } - terrno = code; - return code; + TAOS_RETURN(code); } /** @@ -1370,6 +1327,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaStat *pStat = NULL; SRSmaInfo *pRSmaInfo = NULL; SRSmaInfoItem *pItem = NULL; + int32_t code = 0; if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, ¶m, POINTER_BYTES))) { smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%d", param, @@ -1386,7 +1344,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { pSma = pStat->pSma; - if (!(pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid))) { + if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) { smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId, pRSmaRef->refId); // pRSmaRef freed in taosHashRemove tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId); @@ -1484,6 +1442,8 @@ static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) { * @return int32_t */ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { + int32_t code = 0; + int32_t lino = 0; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); @@ -1512,12 +1472,10 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { pItem->nScanned = 0; - if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { - goto _err; - } - if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL) < 0) { - atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno); - goto _err; + TAOS_CHECK_EXIT(qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)); + if ((code = tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL)) < 0) { + atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, code); + goto _exit; } smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished", @@ -1529,10 +1487,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { } } -_end: - return TSDB_CODE_SUCCESS; -_err: - return TSDB_CODE_FAILED; +_exit: + TAOS_RETURN(code); } static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { @@ -1541,6 +1497,8 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA int32_t nSubmit = 0; int32_t nDelete = 0; int64_t version = 0; + int32_t code = 0; + int32_t lino = 0; SPackedData packData; @@ -1559,23 +1517,21 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA version = packData.ver; if (!taosArrayPush(pSubmitArr, &packData)) { taosFreeQitem(msg); - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } ++nSubmit; } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) { _resume_delete: version = RSMA_EXEC_MSG_VER(msg); - if ((terrno = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, - &packData.pDataBlock, 1))) { + if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, + &packData.pDataBlock, 1))) { taosFreeQitem(msg); - goto _err; + TAOS_CHECK_EXIT(code); } if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) { taosFreeQitem(msg); - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } taosFreeQitem(msg); if (packData.pDataBlock) { @@ -1593,9 +1549,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA ASSERTS(size > 0, "size is %d", size); int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i) < 0) { - goto _err; - } + TAOS_CHECK_EXIT(tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i)); } tdFreeRSmaSubmitItems(pSubmitArr, inputType); nSubmit = 0; @@ -1612,7 +1566,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA _rtn: return TSDB_CODE_SUCCESS; -_err: +_exit: atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno); smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr()); @@ -1626,7 +1580,7 @@ _err: break; } } - return TSDB_CODE_FAILED; + TAOS_RETURN(code); } /** @@ -1677,7 +1631,10 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock int32_t qallItemSize = taosQallItemSize(pInfo->qall); if (qallItemSize > 0) { - tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) { + taosHashCancelIterate(infoHash, pIter); + TSDB_CHECK_CODE(code, lino, _exit); + } smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type); } @@ -1693,7 +1650,10 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { TSDB_CHECK_CODE(code, lino, _exit); } - tdRSmaFetchAllResult(pSma, pInfo); + if ((code = tdRSmaFetchAllResult(pSma, pInfo)) != 0) { + taosHashCancelIterate(infoHash, pIter); + TSDB_CHECK_CODE(code, lino, _exit); + } if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); @@ -1742,5 +1702,5 @@ _exit: if (code) { smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } - return code; + TAOS_RETURN(code); } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 0b32c96d2e..a35068163f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -34,7 +34,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 int32_t numOfRows = 0; if (ASSERTS(pTsdb->mem != NULL, "vgId:%d, mem is NULL", TD_VID(pTsdb->pVnode))) { - return -1; + TAOS_RETURN(TSDB_CODE_INVALID_PTR); } arrSize = taosArrayGetSize(pMsg->aSubmitTbData); @@ -42,7 +42,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 // scan and convert if ((code = tsdbScanAndConvertSubmitMsg(pTsdb, pMsg)) < 0) { if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE) { - tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno)); + tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); } return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index f7b618d18b..6b46ddd8df 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -601,7 +601,7 @@ int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) { int32_t code = TSDB_CODE_SUCCESS; SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid); if (!pCur) { - return TSDB_CODE_FAILED; + return TSDB_CODE_OUT_OF_MEMORY; } while (1) {