diff --git a/include/util/tqueue.h b/include/util/tqueue.h index bed218ac1b..b948bbf410 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -77,6 +77,7 @@ STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize); +int32_t taosAllocateQitemWrapper(int32_t size, EQItype itype, int64_t dataSize, void **pItem); void taosFreeQitem(void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 0360b54d6f..d0459eb838 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -45,7 +45,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer}; int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req); - void * pHead = rpcMallocCont(contLen); + void *pHead = rpcMallocCont(contLen); tSerializeRetrieveIpWhite(pHead, contLen, &req); SRpcMsg rpcMsg = {.pCont = pHead, @@ -116,7 +116,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite; req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0; req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat; - req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum; + req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum; req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor; req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval; req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope; @@ -146,7 +146,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.ipWhiteVer = pMgmt->pData->ipWhiteVer; int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); - void * pHead = rpcMallocCont(contLen); + void *pHead = rpcMallocCont(contLen); tSerializeSStatusReq(pHead, contLen, &req); tFreeSStatusReq(&req); @@ -207,18 +207,22 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { + int32_t code = 0; SDCfgDnodeReq cfgReq = {0}; if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + return TSDB_CODE_INVALID_MSG; } dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value); SConfig *pCfg = taosGetCfg(); - cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD, true); - taosCfgDynamicOptions(pCfg, cfgReq.config, true); - return 0; + + code = cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD, true); + if (code != 0) { + return code; + } + + return taosCfgDynamicOptions(pCfg, cfgReq.config, true); } int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { @@ -251,7 +255,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK; pStatus->details[0] = 0; - SMonMloadInfo minfo = {0}; + SMonMloadInfo minfo = {0}; (*pMgmt->getMnodeLoadsFp)(&minfo); if (minfo.isMnode && (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) { @@ -276,6 +280,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { } int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { + int32_t code = 0; dDebug("server run status req is received"); SServerStatusRsp statusRsp = {0}; dmGetServerRunStatus(pMgmt, &statusRsp); @@ -284,24 +289,37 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp); if (rspLen < 0) { rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return rspMsg.code; } void *pRsp = rpcMallocCont(rspLen); if (pRsp == NULL) { rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return rspMsg.code; + } + + code = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp); + if (code != 0) { + rpcFreeCont(pRsp); + rspMsg.code = code; + return code; } - tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp); pMsg->info.rsp = pRsp; pMsg->info.rspLen = rspLen; return 0; } -SSDataBlock *dmBuildVariablesBlock(void) { - SSDataBlock * pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - size_t size = 0; +int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) { + int32_t code = 0; + + SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pBlock == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + size_t size = 0; + const SSysTableMeta *pMeta = NULL; getInfosDbMeta(&pMeta, &size); @@ -314,52 +332,74 @@ SSDataBlock *dmBuildVariablesBlock(void) { } pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData)); + if (pBlock->pDataBlock == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } for (int32_t i = 0; i < pMeta[index].colNum; ++i) { SColumnInfoData colInfoData = {0}; colInfoData.info.colId = i + 1; colInfoData.info.type = pMeta[index].schema[i].type; colInfoData.info.bytes = pMeta[index].schema[i].bytes; - taosArrayPush(pBlock->pDataBlock, &colInfoData); + if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } } pBlock->info.hasVarCol = true; - - return pBlock; +_exit: + if (code != 0) { + blockDataDestroy(pBlock); + } else { + *ppBlock = pBlock; + } + return code; } int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) { - /*int32_t code = */dumpConfToDataBlock(pBlock, 1); + int32_t code = dumpConfToDataBlock(pBlock, 1); + if (code != 0) { + return code; + } SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); - colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false); + if (pColInfo == NULL) { + return TSDB_CODE_OUT_OF_RANGE; + } - return TSDB_CODE_SUCCESS; + return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false); } int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { - int32_t size = 0; - int32_t rowsRead = 0; - + int32_t size = 0; + int32_t rowsRead = 0; + int32_t code = 0; SRetrieveTableReq retrieveReq = {0}; if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + return TSDB_CODE_INVALID_MSG; } #if 0 if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) { - terrno = TSDB_CODE_MND_NO_RIGHTS; - return -1; + code = TSDB_CODE_MND_NO_RIGHTS; + return code; } #endif if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + return TSDB_CODE_INVALID_MSG; } - SSDataBlock *pBlock = dmBuildVariablesBlock(); + SSDataBlock *pBlock = NULL; + if ((code = dmBuildVariablesBlock(&pBlock)) != 0) { + return code; + } - dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId); + code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId); + if (code != 0) { + blockDataDestroy(pBlock); + return code; + } size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + @@ -367,10 +407,10 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size); if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - dError("failed to retrieve data since %s", terrstr()); + code = TSDB_CODE_OUT_OF_MEMORY; + dError("failed to retrieve data since %s", tstrerror(code)); blockDataDestroy(pBlock); - return -1; + return code; } char *pStart = pRsp->data; @@ -404,7 +444,9 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { SArray *dmGetMsgHandles() { int32_t code = -1; SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle)); - if (pArray == NULL) goto _OVER; + if (pArray == NULL) { + return NULL; + } // Requests handled by DNODE if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; @@ -416,7 +458,7 @@ SArray *dmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; // Requests handled by MNODE diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index b9dd45f1c0..958b411881 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -18,22 +18,23 @@ #include "libs/function/tudf.h" static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { - if (dmStartStatusThread(pMgmt) != 0) { - return -1; + int32_t code = 0; + if ((code = dmStartStatusThread(pMgmt)) != 0) { + return code; } #if defined(TD_ENTERPRISE) - if (dmStartNotifyThread(pMgmt) != 0) { - return -1; + if ((code = dmStartNotifyThread(pMgmt)) != 0) { + return code; } #endif - if (dmStartMonitorThread(pMgmt) != 0) { - return -1; + if ((code = dmStartMonitorThread(pMgmt)) != 0) { + return code; } - if (dmStartAuditThread(pMgmt) != 0) { - return -1; + if ((code = dmStartAuditThread(pMgmt)) != 0) { + return code; } - if (dmStartCrashReportThread(pMgmt) != 0) { - return -1; + if ((code = dmStartCrashReportThread(pMgmt)) != 0) { + return code; } return 0; } @@ -50,10 +51,10 @@ static void dmStopMgmt(SDnodeMgmt *pMgmt) { } static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { + int32_t code = 0; SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt)); if (pMgmt == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } pMgmt->pData = pInput->pData; @@ -70,12 +71,11 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp; - // pMgmt->pData->ipWhiteVer = 0; - if (dmStartWorker(pMgmt) != 0) { - return -1; + if ((code = dmStartWorker(pMgmt)) != 0) { + return code; } - if (udfStartUdfd(pMgmt->pData->dnodeId) != 0) { + if ((code = udfStartUdfd(pMgmt->pData->dnodeId)) != 0) { dError("failed to start udfd"); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index eafa10aa32..add62d1edc 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -264,12 +264,14 @@ static void *dmCrashReportThreadFp(void *param) { } int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { + int32_t code = 0; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) { - dError("failed to create status thread since %s", strerror(errno)); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create status thread since %s", tstrerror(code)); + return code; } taosThreadAttrDestroy(&thAttr); @@ -285,12 +287,14 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) { } int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) { + int32_t code = 0; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) { - dError("failed to create notify thread since %s", strerror(errno)); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create notify thread since %s", strerror(code)); + return code; } taosThreadAttrDestroy(&thAttr); @@ -308,12 +312,14 @@ void dmStopNotifyThread(SDnodeMgmt *pMgmt) { } int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { + int32_t code = 0; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) { - dError("failed to create monitor thread since %s", strerror(errno)); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create monitor thread since %s", tstrerror(code)); + return code; } taosThreadAttrDestroy(&thAttr); @@ -322,12 +328,14 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { } int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) { + int32_t code = 0; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) { - dError("failed to create audit thread since %s", strerror(errno)); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create audit thread since %s", tstrerror(code)); + return code; } taosThreadAttrDestroy(&thAttr); @@ -350,6 +358,7 @@ void dmStopAuditThread(SDnodeMgmt *pMgmt) { } int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) { + int32_t code = 0; if (!tsEnableCrashReport) { return 0; } @@ -358,8 +367,9 @@ int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) { taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) { - dError("failed to create crashReport thread since %s", strerror(errno)); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create crashReport thread since %s", tstrerror(code)); + return code; } taosThreadAttrDestroy(&thAttr); @@ -431,8 +441,8 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg); break; default: - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dGError("msg:%p, not processed in mgmt queue", pMsg); + code = TSDB_CODE_MSG_NOT_PROCESSED; + dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code)); break; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 27baa5ede5..b9b98eb09b 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -21,31 +21,31 @@ static int32_t mmDecodeOption(SJson *pJson, SMnodeOpt *pOption) { int32_t code = 0; tjsonGetInt32ValueFromDouble(pJson, "deployed", pOption->deploy, code); - if (code < 0) return -1; + if (code < 0) return code; tjsonGetInt32ValueFromDouble(pJson, "selfIndex", pOption->selfIndex, code); - if (code < 0) return 0; + if (code < 0) return code; tjsonGetInt32ValueFromDouble(pJson, "lastIndex", pOption->lastIndex, code); - if (code < 0) return 0; + if (code < 0) return code; SJson *replicas = tjsonGetObjectItem(pJson, "replicas"); - if (replicas == NULL) return 0; + if (replicas == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; pOption->numOfTotalReplicas = tjsonGetArraySize(replicas); pOption->numOfReplicas = 0; for (int32_t i = 0; i < pOption->numOfTotalReplicas; ++i) { SJson *replica = tjsonGetArrayItem(replicas, i); - if (replica == NULL) return -1; + if (replica == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; SReplica *pReplica = pOption->replicas + i; tjsonGetInt32ValueFromDouble(replica, "id", pReplica->id, code); - if (code < 0) return -1; + if (code < 0) return code; code = tjsonGetStringValue(replica, "fqdn", pReplica->fqdn); - if (code < 0) return -1; + if (code < 0) return code; tjsonGetUInt16ValueFromDouble(replica, "port", pReplica->port, code); - if (code < 0) return -1; + if (code < 0) return code; tjsonGetInt32ValueFromDouble(replica, "role", pOption->nodeRoles[i], code); - if (code < 0) return -1; + if (code < 0) return code; if (pOption->nodeRoles[i] == TAOS_SYNC_ROLE_VOTER) { pOption->numOfReplicas++; } @@ -63,36 +63,41 @@ int32_t mmReadFile(const char *path, SMnodeOpt *pOption) { char *pData = NULL; SJson *pJson = NULL; char file[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%smnode.json", path, TD_DIRSEP); + + int32_t nBytes = snprintf(file, sizeof(file), "%s%smnode.json", path, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= sizeof(file)) { + code = TSDB_CODE_OUT_OF_BUFFER; + goto _OVER; + } if (taosStatFile(file, NULL, NULL, NULL) < 0) { - dInfo("mnode file:%s not exist", file); + dInfo("mnode file:%s not exist, reason:%s", file, tstrerror(TAOS_SYSTEM_ERROR(errno))); return 0; } pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to open mnode file:%s since %s", file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to open mnode file:%s since %s", file, tstrerror(code)); goto _OVER; } int64_t size = 0; if (taosFStatFile(pFile, &size, NULL) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to fstat mnode file:%s since %s", file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to fstat mnode file:%s since %s", file, tstrerror(code)); goto _OVER; } pData = taosMemoryMalloc(size + 1); if (pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } if (taosReadFile(pFile, pData, size) != size) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to read mnode file:%s since %s", file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to read mnode file:%s since %s", file, tstrerror(code)); goto _OVER; } @@ -100,12 +105,11 @@ int32_t mmReadFile(const char *path, SMnodeOpt *pOption) { pJson = tjsonParse(pData); if (pJson == NULL) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; + code = TSDB_CODE_INVALID_JSON_FORMAT; goto _OVER; } - if (mmDecodeOption(pJson, pOption) < 0) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; + if ((code = mmDecodeOption(pJson, pOption)) < 0) { goto _OVER; } @@ -118,37 +122,42 @@ _OVER: if (pFile != NULL) taosCloseFile(&pFile); if (code != 0) { - dError("failed to read mnode file:%s since %s", file, terrstr()); + dError("failed to read mnode file:%s since %s", file, tstrerror(code)); } return code; } static int32_t mmEncodeOption(SJson *pJson, const SMnodeOpt *pOption) { + int32_t code = 0; if (pOption->deploy && pOption->numOfTotalReplicas > 0) { - if (tjsonAddDoubleToObject(pJson, "selfIndex", pOption->selfIndex) < 0) return -1; + if ((code = tjsonAddDoubleToObject(pJson, "selfIndex", pOption->selfIndex)) < 0) return code; SJson *replicas = tjsonCreateArray(); - if (replicas == NULL) return -1; - if (tjsonAddItemToObject(pJson, "replicas", replicas) < 0) return -1; + if (replicas == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if ((code = tjsonAddItemToObject(pJson, "replicas", replicas)) < 0) return code; for (int32_t i = 0; i < pOption->numOfTotalReplicas; ++i) { SJson *replica = tjsonCreateObject(); - if (replica == NULL) return -1; + if (replica == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } const SReplica *pReplica = pOption->replicas + i; - if (tjsonAddDoubleToObject(replica, "id", pReplica->id) < 0) return -1; - if (tjsonAddStringToObject(replica, "fqdn", pReplica->fqdn) < 0) return -1; - if (tjsonAddDoubleToObject(replica, "port", pReplica->port) < 0) return -1; - if (tjsonAddDoubleToObject(replica, "role", pOption->nodeRoles[i]) < 0) return -1; - if (tjsonAddItemToArray(replicas, replica) < 0) return -1; + if ((code = tjsonAddDoubleToObject(replica, "id", pReplica->id)) < 0) return code; + if ((code = tjsonAddStringToObject(replica, "fqdn", pReplica->fqdn)) < 0) return code; + if ((code = tjsonAddDoubleToObject(replica, "port", pReplica->port)) < 0) return code; + if ((code = tjsonAddDoubleToObject(replica, "role", pOption->nodeRoles[i])) < 0) return code; + if ((code = tjsonAddItemToArray(replicas, replica)) < 0) return code; } } - if (tjsonAddDoubleToObject(pJson, "lastIndex", pOption->lastIndex) < 0) return -1; + if ((code = tjsonAddDoubleToObject(pJson, "lastIndex", pOption->lastIndex)) < 0) return code; - if (tjsonAddDoubleToObject(pJson, "deployed", pOption->deploy) < 0) return -1; + if ((code = tjsonAddDoubleToObject(pJson, "deployed", pOption->deploy)) < 0) return code; - return 0; + return code; } int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) { @@ -158,28 +167,59 @@ int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) { TdFilePtr pFile = NULL; char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%smnode.json.bak", path, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%smnode.json", path, TD_DIRSEP); - terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t nBytes = snprintf(file, sizeof(file), "%s%smnode.json.bak", path, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= sizeof(file)) { + code = TSDB_CODE_OUT_OF_BUFFER; + goto _OVER; + } + + nBytes = snprintf(realfile, sizeof(realfile), "%s%smnode.json", path, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= sizeof(realfile)) { + code = TSDB_CODE_OUT_OF_BUFFER; + goto _OVER; + } + + // terrno = TSDB_CODE_OUT_OF_MEMORY; pJson = tjsonCreateObject(); - if (pJson == NULL) goto _OVER; - if (mmEncodeOption(pJson, pOption) != 0) goto _OVER; + if (pJson == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + TAOS_CHECK_GOTO(mmEncodeOption(pJson, pOption), NULL, _OVER); + buffer = tjsonToString(pJson); - if (buffer == NULL) goto _OVER; - terrno = 0; + if (buffer == NULL) { + code = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); - if (pFile == NULL) goto _OVER; + if (pFile == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } int32_t len = strlen(buffer); - if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; - if (taosFsyncFile(pFile) < 0) goto _OVER; + if (taosWriteFile(pFile, buffer, len) <= 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } + if (taosFsyncFile(pFile) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } - taosCloseFile(&pFile); - if (taosRenameFile(file, realfile) != 0) goto _OVER; + if (taosCloseFile(&pFile) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } + if (taosRenameFile(file, realfile) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } - code = 0; dInfo("succeed to write mnode file:%s, deloyed:%d", realfile, pOption->deploy); _OVER: @@ -188,8 +228,7 @@ _OVER: if (pFile != NULL) taosCloseFile(&pFile); if (code != 0) { - if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to write mnode file:%s since %s, deloyed:%d", realfile, terrstr(), pOption->deploy); + dError("failed to write mnode file:%s since %s, deloyed:%d", realfile, tstrerror(code), pOption->deploy); } return code; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 9b987b3237..067ce528d5 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -26,11 +26,12 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { } int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; SDCreateMnodeReq createReq = {0}; if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + return code; } SMnodeOpt option = {.deploy = true, @@ -56,43 +57,45 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } if (option.selfIndex == -1) { - terrno = TSDB_CODE_INVALID_OPTION; - dGError("failed to create mnode since %s, selfIndex is -1", terrstr()); - return -1; + code = TSDB_CODE_INVALID_OPTION; + dGError("failed to create mnode since %s, selfIndex is -1", tstrerror(code)); + return code; } - if (mmWriteFile(pInput->path, &option) != 0) { - dGError("failed to write mnode file since %s", terrstr()); - return -1; + if ((code = mmWriteFile(pInput->path, &option)) != 0) { + dGError("failed to write mnode file since %s", tstrerror(code)); + return code; } return 0; } int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + int32_t code = 0; + const STraceId *trace = &pMsg->info.traceId; SDDropMnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + return code; } if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) { - terrno = TSDB_CODE_INVALID_OPTION; - dGError("failed to drop mnode since %s", terrstr()); + code = TSDB_CODE_INVALID_OPTION; + dGError("failed to drop mnode since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&dropReq); - return -1; + return code; } SMnodeOpt option = {.deploy = false}; - if (mmWriteFile(pInput->path, &option) != 0) { - dGError("failed to write mnode file since %s", terrstr()); + if ((code = mmWriteFile(pInput->path, &option)) != 0) { + dGError("failed to write mnode file since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&dropReq); - return -1; + return code; } tFreeSMCreateQnodeReq(&dropReq); - return 0; + return code; } SArray *mmGetMsgHandles() { diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index d25c6438e8..57436adcd0 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -25,9 +25,10 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) { } static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { + int32_t code = 0; SMnodeOpt option = {0}; - if (mmReadFile(pInput->path, &option) != 0) { - return -1; + if ((code = mmReadFile(pInput->path, &option)) != 0) { + return code; } if (!option.deploy) { @@ -41,7 +42,7 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { dInfo("deploy mnode required. option deploy:%d", option.deploy); } - return 0; + return code; } static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) { @@ -73,22 +74,31 @@ static void mmClose(SMnodeMgmt *pMgmt) { taosMemoryFree(pMgmt); } - +static int32_t mndOpenWrapper(const char *path, SMnodeOpt *opt, SMnode **pMnode) { + int32_t code = 0; + *pMnode = mndOpen(path, opt); + if (*pMnode == NULL) { + code = terrno; + } + ///*pMnode = pNode; + return code; +} static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { - if (walInit() != 0) { - dError("failed to init wal since %s", terrstr()); - return -1; + int32_t code = 0; + if ((code = walInit()) != 0) { + dError("failed to init wal since %s", tstrerror(code)); + return code; } - if (syncInit() != 0) { - dError("failed to init sync since %s", terrstr()); - return -1; + if ((code = syncInit()) != 0) { + dError("failed to init sync since %s", tstrerror(code)); + return code; } SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt)); if (pMgmt == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } pMgmt->pData = pInput->pData; @@ -100,10 +110,10 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { taosThreadRwlockInit(&pMgmt->lock, NULL); SMnodeOpt option = {0}; - if (mmReadFile(pMgmt->path, &option) != 0) { - dError("failed to read file since %s", terrstr()); + if ((code = mmReadFile(pMgmt->path, &option)) != 0) { + dError("failed to read file since %s", tstrerror(code)); mmClose(pMgmt); - return -1; + return code; } if (!option.deploy) { @@ -115,18 +125,18 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { mmBuildOptionForOpen(pMgmt, &option); } - pMgmt->pMnode = mndOpen(pMgmt->path, &option); - if (pMgmt->pMnode == NULL) { - dError("failed to open mnode since %s", terrstr()); + code = mndOpenWrapper(pMgmt->path, &option, &pMgmt->pMnode); + if (code != 0) { + dError("failed to open mnode since %s", tstrerror(code)); mmClose(pMgmt); - return -1; + return code; } tmsgReportStartup("mnode-impl", "initialized"); - if (mmStartWorker(pMgmt) != 0) { - dError("failed to start mnode worker since %s", terrstr()); + if ((code = mmStartWorker(pMgmt)) != 0) { + dError("failed to start mnode worker since %s", tstrerror(code)); mmClose(pMgmt); - return -1; + return code; } tmsgReportStartup("mnode-worker", "initialized"); @@ -134,9 +144,9 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { option.deploy = true; option.numOfReplicas = 0; option.numOfTotalReplicas = 0; - if (mmWriteFile(pMgmt->path, &option) != 0) { - dError("failed to write mnode file since %s", terrstr()); - return -1; + if ((code = mmWriteFile(pMgmt->path, &option)) != 0) { + dError("failed to write mnode file since %s", tstrerror(code)); + return code; } } @@ -160,13 +170,9 @@ static void mmStop(SMnodeMgmt *pMgmt) { mndStop(pMgmt->pMnode); } -static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) { - return mndIsCatchUp(pMgmt->pMnode); -} +static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) { return mndIsCatchUp(pMgmt->pMnode); } -static ESyncRole mmSyncGetRole(SMnodeMgmt *pMgmt) { - return mndGetRole(pMgmt->pMnode); -} +static ESyncRole mmSyncGetRole(SMnodeMgmt *pMgmt) { return mndGetRole(pMgmt->pMnode); } SMgmtFunc mmGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index e5c32f9a43..cce2a3568d 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -186,7 +186,8 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; - dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), pRpc->contLen); + dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), + pRpc->contLen); int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); @@ -197,6 +198,7 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { + int32_t code = 0; SSingleWorkerCfg qCfg = { .min = tsNumOfMnodeQueryThreads, .max = tsNumOfMnodeQueryThreads, @@ -205,9 +207,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .param = pMgmt, .poolType = QUERY_AUTO_QWORKER_POOL, }; - if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { - dError("failed to start mnode-query worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) { + dError("failed to start mnode-query worker since %s", tstrerror(code)); + return code; } SSingleWorkerCfg fCfg = { @@ -217,9 +219,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg) != 0) { - dError("failed to start mnode-fetch worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) { + dError("failed to start mnode-fetch worker since %s", tstrerror(code)); + return code; } SSingleWorkerCfg rCfg = { @@ -229,9 +231,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { - dError("failed to start mnode-read worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) { + dError("failed to start mnode-read worker since %s", tstrerror(code)); + return code; } SSingleWorkerCfg wCfg = { @@ -241,9 +243,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { - dError("failed to start mnode-write worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) { + dError("failed to start mnode-write worker since %s", tstrerror(code)); + return code; } SSingleWorkerCfg sCfg = { @@ -253,9 +255,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .fp = (FItem)mmProcessSyncMsg, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { - dError("failed to start mnode mnode-sync worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) { + dError("failed to start mnode mnode-sync worker since %s", tstrerror(code)); + return code; } SSingleWorkerCfg scCfg = { @@ -265,9 +267,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .fp = (FItem)mmProcessSyncMsg, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg) != 0) { - dError("failed to start mnode mnode-sync-rd worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) { + dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code)); + return code; } SSingleWorkerCfg arbCfg = { @@ -277,13 +279,13 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg) != 0) { - dError("failed to start mnode mnode-arb worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) { + dError("failed to start mnode mnode-arb worker since %s", tstrerror(code)); + return code; } dDebug("mnode workers are initialized"); - return 0; + return code; } void mmStopWorker(SMnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 82876d6886..ae7125811e 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -30,24 +30,25 @@ void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) { } int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + int32_t code = 0; SDCreateQnodeReq createReq = {0}; if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + return code; } if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) { - terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to create qnode since %s", terrstr()); + code = TSDB_CODE_INVALID_OPTION; + dError("failed to create qnode since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&createReq); - return -1; + return code; } bool deployed = true; - if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) { - dError("failed to write qnode file since %s", terrstr()); + if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) { + dError("failed to write qnode file since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&createReq); - return -1; + return code; } tFreeSMCreateQnodeReq(&createReq); @@ -55,24 +56,25 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + int32_t code = 0; SDDropQnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + return code; } if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) { - terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to drop qnode since %s", terrstr()); + code = TSDB_CODE_INVALID_OPTION; + dError("failed to drop qnode since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&dropReq); - return -1; + return code; } bool deployed = false; - if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) { - dError("failed to write qnode file since %s", terrstr()); + if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) { + dError("failed to write qnode file since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&dropReq); - return -1; + return code; } tFreeSMCreateQnodeReq(&dropReq); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c index 8a042da078..553f39e5da 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c @@ -33,11 +33,18 @@ static void qmClose(SQnodeMgmt *pMgmt) { taosMemoryFree(pMgmt); } +static int32_t qndOpenWrapper(SQnodeOpt *pOption, SQnode **pQnode) { + *pQnode = qndOpen(pOption); + if (*pQnode == NULL) { + return terrno; + } + return 0; +} static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { + int32_t code = 0; SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt)); if (pMgmt == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } pMgmt->pData = pInput->pData; @@ -50,29 +57,30 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { SQnodeOpt option = {0}; qmInitOption(pMgmt, &option); - pMgmt->pQnode = qndOpen(&option); - if (pMgmt->pQnode == NULL) { - dError("failed to open qnode since %s", terrstr()); + + code = qndOpenWrapper(&option, &pMgmt->pQnode); + if (code != 0) { + dError("failed to open qnode since %s", tstrerror(code)); qmClose(pMgmt); return -1; } tmsgReportStartup("qnode-impl", "initialized"); - if (udfcOpen() != 0) { + if ((code = udfcOpen()) != 0) { dError("qnode can not open udfc"); qmClose(pMgmt); - return -1; + return code; } - if (qmStartWorker(pMgmt) != 0) { - dError("failed to start qnode worker since %s", terrstr()); + if ((code = qmStartWorker(pMgmt)) != 0) { + dError("failed to start qnode worker since %s", tstrerror(code)); qmClose(pMgmt); - return -1; + return code; } tmsgReportStartup("qnode-worker", "initialized"); pOutput->pMgmt = pMgmt; - return 0; + return code; } SMgmtFunc qmGetMgmtFunc() { diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 5c635ff5ea..57611b9e9c 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -99,6 +99,8 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { } int32_t qmStartWorker(SQnodeMgmt *pMgmt) { + int32_t code = 0; + SSingleWorkerCfg queryCfg = { .min = tsNumOfVnodeQueryThreads, .max = tsNumOfVnodeQueryThreads, @@ -108,9 +110,9 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .poolType = QUERY_AUTO_QWORKER_POOL, }; - if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { - dError("failed to start qnode-query worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg)) != 0) { + dError("failed to start qnode-query worker since %s", tstrerror(code)); + return code; } SSingleWorkerCfg fetchCfg = { @@ -121,13 +123,13 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { - dError("failed to start qnode-fetch worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg)) != 0) { + dError("failed to start qnode-fetch worker since %s", tstrerror(code)); + return code; } dDebug("qnode workers are initialized"); - return 0; + return code; } void qmStopWorker(SQnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 5c2f54fd10..c2d146462a 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -19,24 +19,25 @@ void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + int32_t code = 0; SDCreateSnodeReq createReq = {0}; if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + return code; } if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) { - terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to create snode since %s", terrstr()); + code = TSDB_CODE_INVALID_OPTION; + dError("failed to create snode since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&createReq); - return -1; + return code; } bool deployed = true; - if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) { - dError("failed to write snode file since %s", terrstr()); + if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) { + dError("failed to write snode file since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&createReq); - return -1; + return code; } tFreeSMCreateQnodeReq(&createReq); @@ -44,24 +45,26 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + int32_t code = 0; SDDropSnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + + return code; } if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) { - terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to drop snode since %s", terrstr()); + code = TSDB_CODE_INVALID_OPTION; + dError("failed to drop snode since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&dropReq); - return -1; + return code; } bool deployed = false; - if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) { - dError("failed to write snode file since %s", terrstr()); + if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) { + dError("failed to write snode file since %s", tstrerror(code)); tFreeSMCreateQnodeReq(&dropReq); - return -1; + return code; } tFreeSMCreateQnodeReq(&dropReq); diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 56744e4654..6bc0131e63 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -33,12 +33,19 @@ static void smClose(SSnodeMgmt *pMgmt) { taosMemoryFree(pMgmt); } - +int32_t sndOpenWrapper(const char *path, SSnodeOpt *pOption, SSnode **pNode) { + *pNode = sndOpen(path, pOption); + if (*pNode == NULL) { + return terrno; + } + return 0; +} int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { + int32_t code = 0; SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt)); if (pMgmt == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } pMgmt->pData = pInput->pData; @@ -50,35 +57,34 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { SSnodeOpt option = {0}; smInitOption(pMgmt, &option); - pMgmt->pSnode = sndOpen(pMgmt->path, &option); - if (pMgmt->pSnode == NULL) { - dError("failed to open snode since %s", terrstr()); + + code = sndOpenWrapper(pMgmt->path, &option, &pMgmt->pSnode); + if (code != 0) { + dError("failed to open snode since %s", tstrerror(code)); smClose(pMgmt); - return -1; + return code; } tmsgReportStartup("snode-impl", "initialized"); - if (smStartWorker(pMgmt) != 0) { - dError("failed to start snode worker since %s", terrstr()); + if ((code = smStartWorker(pMgmt)) != 0) { + dError("failed to start snode worker since %s", tstrerror(code)); smClose(pMgmt); - return -1; + return code; } tmsgReportStartup("snode-worker", "initialized"); - if (udfcOpen() != 0) { - dError("failed to open udfc in snode"); + if ((code = udfcOpen()) != 0) { + dError("failed to open udfc in snode since:%s", tstrerror(code)); smClose(pMgmt); - return -1; + return code; } pOutput->pMgmt = pMgmt; return 0; } -static int32_t smStartSnodes(SSnodeMgmt *pMgmt) { - return sndInit(pMgmt->pSnode); -} +static int32_t smStartSnodes(SSnodeMgmt *pMgmt) { return sndInit(pMgmt->pSnode); } SMgmtFunc smGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 1c57685414..abae463ec3 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -68,17 +68,18 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { } int32_t smStartWorker(SSnodeMgmt *pMgmt) { + int32_t code = 0; pMgmt->writeWroker = taosArrayInit(0, sizeof(SMultiWorker *)); if (pMgmt->writeWroker == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } for (int32_t i = 0; i < tsNumOfSnodeWriteThreads; i++) { SMultiWorker *pWriteWorker = taosMemoryMalloc(sizeof(SMultiWorker)); if (pWriteWorker == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } SMultiWorkerCfg cfg = { @@ -87,13 +88,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { .fp = smProcessWriteQueue, .param = pMgmt, }; - if (tMultiWorkerInit(pWriteWorker, &cfg) != 0) { - dError("failed to start snode-unique worker since %s", terrstr()); - return -1; + if ((code = tMultiWorkerInit(pWriteWorker, &cfg)) != 0) { + dError("failed to start snode-unique worker since %s", tstrerror(code)); + return code; } if (taosArrayPush(pMgmt->writeWroker, &pWriteWorker) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } } @@ -105,13 +106,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) { - dError("failed to start snode shared-worker since %s", terrstr()); - return -1; + if ((code = tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) != 0) { + dError("failed to start snode shared-worker since %s", tstrerror(code)); + return code; } dDebug("snode workers are initialized"); - return 0; + return code; } void smStopWorker(SSnodeMgmt *pMgmt) { @@ -126,21 +127,23 @@ void smStopWorker(SSnodeMgmt *pMgmt) { } int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { + int32_t code = 0; SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; - return -1; + return code = terrno; } SSnode *pSnode = pMgmt->pSnode; if (pSnode == NULL) { - dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, terrstr(), + code = terrno; + dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; - return -1; + return code; } SMsgHead *pHead = pRpc->pCont; @@ -151,48 +154,44 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { switch (qtype) { case STREAM_QUEUE: - smPutNodeMsgToStreamQueue(pMgmt, pMsg); + code = smPutNodeMsgToStreamQueue(pMgmt, pMsg); break; case WRITE_QUEUE: - smPutNodeMsgToWriteQueue(pMgmt, pMsg); + code = smPutNodeMsgToWriteQueue(pMgmt, pMsg); break; default: - terrno = TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); - return -1; + return code; } - return 0; + return code; } int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { + int32_t code = 0; SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0); if (pWorker == NULL) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + return TSDB_CODE_INVALID_MSG; } dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - taosWriteQitem(pWorker->queue, pMsg); - return 0; + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0); if (pWorker == NULL) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + return TSDB_CODE_INVALID_MSG; } dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - taosWriteQitem(pWorker->queue, pMsg); - return 0; + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->streamWorker; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - taosWriteQitem(pWorker->queue, pMsg); - return 0; + return taosWriteQitem(pWorker->queue, pMsg); } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index c3b4bc0710..6b01b92445 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -54,24 +54,24 @@ typedef struct { } SWrapperCfg; typedef struct { - int32_t vgId; - int32_t vgVersion; - int32_t refCount; - int8_t dropped; - int8_t failed; - int8_t disable; - int32_t diskPrimary; - int32_t toVgId; - char *path; - SVnode *pImpl; - SMultiWorker pWriteW; - SMultiWorker pSyncW; - SMultiWorker pSyncRdW; - SMultiWorker pApplyW; - STaosQueue *pQueryQ; - STaosQueue *pStreamQ; - STaosQueue *pFetchQ; - STaosQueue *pMultiMgmQ; + int32_t vgId; + int32_t vgVersion; + int32_t refCount; + int8_t dropped; + int8_t failed; + int8_t disable; + int32_t diskPrimary; + int32_t toVgId; + char *path; + SVnode *pImpl; + SMultiWorker pWriteW; + SMultiWorker pSyncW; + SMultiWorker pSyncRdW; + SMultiWorker pApplyW; + STaosQueue *pQueryQ; + STaosQueue *pStreamQ; + STaosQueue *pFetchQ; + STaosQueue *pMultiMgmQ; } SVnodeObj; typedef struct { @@ -107,9 +107,9 @@ int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); // vmFile.c -int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); -int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt); -SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes); +int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); +int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt); +int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes); // vmWorker.c int32_t vmStartWorker(SVnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 53139330a3..47ae2a1395 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -19,12 +19,16 @@ #define MAX_CONTENT_LEN 2 * 1024 * 1024 -SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { +int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { taosThreadRwlockRdlock(&pMgmt->lock); int32_t num = 0; int32_t size = taosHashGetSize(pMgmt->hash); SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); + if (pVnodes == NULL) { + taosThreadRwlockUnlock(&pMgmt->lock); + return TSDB_CODE_OUT_OF_MEMORY; + } void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { @@ -42,8 +46,9 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { taosThreadRwlockUnlock(&pMgmt->lock); *numOfVnodes = num; + *ppVnodes = pVnodes; - return pVnodes; + return 0; } static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { @@ -52,29 +57,32 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg ** *ppCfgs = NULL; SJson *vnodes = tjsonGetObjectItem(pJson, "vnodes"); - if (vnodes == NULL) return -1; + if (vnodes == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; int32_t vnodesNum = cJSON_GetArraySize(vnodes); if (vnodesNum > 0) { pCfgs = taosMemoryCalloc(vnodesNum, sizeof(SWrapperCfg)); - if (pCfgs == NULL) return -1; + if (pCfgs == NULL) return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < vnodesNum; ++i) { SJson *vnode = tjsonGetArrayItem(vnodes, i); - if (vnode == NULL) goto _OVER; + if (vnode == NULL) { + code = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } SWrapperCfg *pCfg = &pCfgs[i]; tjsonGetInt32ValueFromDouble(vnode, "vgId", pCfg->vgId, code); - if (code < 0) goto _OVER; + if (code != 0) goto _OVER; tjsonGetInt32ValueFromDouble(vnode, "dropped", pCfg->dropped, code); - if (code < 0) goto _OVER; + if (code != 0) goto _OVER; tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code); - if (code < 0) goto _OVER; + if (code != 0) goto _OVER; tjsonGetInt32ValueFromDouble(vnode, "diskPrimary", pCfg->diskPrimary, code); - if (code < 0) goto _OVER; + if (code != 0) goto _OVER; tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code); - if (code < 0) goto _OVER; + if (code != 0) goto _OVER; snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId); } @@ -98,33 +106,35 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t snprintf(file, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); if (taosStatFile(file, NULL, NULL, NULL) < 0) { - dInfo("vnode file:%s not exist", file); - return 0; + code = TAOS_SYSTEM_ERROR(errno); + dInfo("vnode file:%s not exist, reason:%s", file, tstrerror(code)); + code = 0; + return code; } pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to open vnode file:%s since %s", file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to open vnode file:%s since %s", file, tstrerror(code)); goto _OVER; } int64_t size = 0; if (taosFStatFile(pFile, &size, NULL) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to fstat mnode file:%s since %s", file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to fstat mnode file:%s since %s", file, tstrerror(code)); goto _OVER; } pData = taosMemoryMalloc(size + 1); if (pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } if (taosReadFile(pFile, pData, size) != size) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to read vnode file:%s since %s", file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to read vnode file:%s since %s", file, tstrerror(code)); goto _OVER; } @@ -132,12 +142,12 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t pJson = tjsonParse(pData); if (pJson == NULL) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; + code = TSDB_CODE_INVALID_JSON_FORMAT; goto _OVER; } if (vmDecodeVnodeList(pJson, pMgmt, ppCfgs, numOfVnodes) < 0) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; + code = TSDB_CODE_INVALID_JSON_FORMAT; goto _OVER; } @@ -150,28 +160,36 @@ _OVER: if (pFile != NULL) taosCloseFile(&pFile); if (code != 0) { - dError("failed to read vnode file:%s since %s", file, terrstr()); + dError("failed to read vnode file:%s since %s", file, tstrerror(code)); } return code; } static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t numOfVnodes) { - SJson *vnodes = tjsonCreateArray(); - if (vnodes == NULL) return -1; - if (tjsonAddItemToObject(pJson, "vnodes", vnodes) < 0) return -1; + int32_t code = 0; + SJson *vnodes = tjsonCreateArray(); + if (vnodes == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if ((code = tjsonAddItemToObject(pJson, "vnodes", vnodes)) < 0) { + tjsonDelete(vnodes); + return code; + }; for (int32_t i = 0; i < numOfVnodes; ++i) { SVnodeObj *pVnode = ppVnodes[i]; if (pVnode == NULL) continue; SJson *vnode = tjsonCreateObject(); - if (vnode == NULL) return -1; - if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1; - if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1; - if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1; - if (tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary) < 0) return -1; - if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1; - if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1; + if (vnode == NULL) return TSDB_CODE_OUT_OF_MEMORY; + if ((code = tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId)) < 0) return code; + if ((code = tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped)) < 0) return code; + if ((code = tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion)) < 0) return code; + if ((code = tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary)) < 0) return code; + if (pVnode->toVgId) { + if ((code = tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId)) < 0) return code; + } + if ((code = tjsonAddItemToArray(vnodes, vnode)) < 0) return code; } return 0; @@ -185,30 +203,60 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { SVnodeObj **ppVnodes = NULL; char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); + + int32_t nBytes = snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= sizeof(file)) { + return TSDB_CODE_OUT_OF_RANGE; + } + + nBytes = snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= sizeof(realfile)) { + return TSDB_CODE_OUT_OF_RANGE; + } int32_t numOfVnodes = 0; - ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); - if (ppVnodes == NULL) goto _OVER; + code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes); + if (code) goto _OVER; - terrno = TSDB_CODE_OUT_OF_MEMORY; + // terrno = TSDB_CODE_OUT_OF_MEMORY; pJson = tjsonCreateObject(); - if (pJson == NULL) goto _OVER; - if (vmEncodeVnodeList(pJson, ppVnodes, numOfVnodes) != 0) goto _OVER; + if (pJson == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + if ((code = vmEncodeVnodeList(pJson, ppVnodes, numOfVnodes)) != 0) goto _OVER; + buffer = tjsonToString(pJson); - if (buffer == NULL) goto _OVER; - terrno = 0; + if (buffer == NULL) { + code = TSDB_CODE_INVALID_JSON_FORMAT; + goto _OVER; + } pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); - if (pFile == NULL) goto _OVER; + if (pFile == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } int32_t len = strlen(buffer); - if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; - if (taosFsyncFile(pFile) < 0) goto _OVER; + if (taosWriteFile(pFile, buffer, len) <= 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } + if (taosFsyncFile(pFile) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } - taosCloseFile(&pFile); - if (taosRenameFile(file, realfile) != 0) goto _OVER; + code = taosCloseFile(&pFile); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } + if (taosRenameFile(file, realfile) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } code = 0; dInfo("succeed to write vnodes file:%s, vnodes:%d", realfile, numOfVnodes); @@ -228,8 +276,7 @@ _OVER: } if (code != 0) { - if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, terrstr(), numOfVnodes); + dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, tstrerror(code), numOfVnodes); } return code; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0669956fb8..1e014e0df6 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -256,8 +256,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { char path[TSDB_FILENAME_LEN] = {0}; if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + return TSDB_CODE_INVALID_MSG; } if (req.learnerReplica == 0) { @@ -298,25 +297,24 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort || strcmp(pReplica->fqdn, tsLocalFqdn) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", req.vgId, pReplica->id, pReplica->fqdn, - pReplica->port); - return -1; + code = TSDB_CODE_INVALID_MSG; + dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id, + pReplica->fqdn, pReplica->port, tstrerror(code)); + return code; } if (req.encryptAlgorithm == DND_CA_SM4) { if (strlen(tsEncryptKey) == 0) { - terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; - dError("vgId:%d, failed to create vnode since encrypt key is empty", req.vgId); - return -1; + code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; + dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code)); + return code; } } vmGenerateVnodeCfg(&req, &vnodeCfg); - if (vmTsmaAdjustDays(&vnodeCfg, &req) < 0) { - dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, terrstr()); - code = terrno != 0 ? terrno : -1; + if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) { + dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code)); goto _OVER; } @@ -327,8 +325,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dError("vgId:%d, already exist", req.vgId); tFreeSCreateVnodeReq(&req); vmReleaseVnode(pMgmt, pVnode); - terrno = TSDB_CODE_VND_ALREADY_EXIST; - code = terrno; + code = TSDB_CODE_VND_ALREADY_EXIST; return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index aa9b009761..41ee392b4d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -33,6 +33,7 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { } int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { + int32_t code = 0; STfs *pTfs = pMgmt->pTfs; int32_t diskId = 0; if (!pTfs) { @@ -59,7 +60,12 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { // alloc int32_t disks[TFS_MAX_DISKS_PER_TIER] = {0}; int32_t numOfVnodes = 0; - SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); + SVnodeObj **ppVnodes = NULL; + + code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes); + if (code != 0) { + return code; + } for (int32_t v = 0; v < numOfVnodes; v++) { SVnodeObj *pVnode = ppVnodes[v]; disks[pVnode->diskPrimary] += 1; @@ -436,6 +442,7 @@ static void *vmCloseVnodeInThread(void *param) { } static void vmCloseVnodes(SVnodeMgmt *pMgmt) { + int32_t code = 0; dInfo("start to close all vnodes"); tSingleWorkerCleanup(&pMgmt->mgmtWorker); dInfo("vnodes mgmt worker is stopped"); @@ -443,7 +450,12 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { dInfo("vnodes multiple mgmt worker is stopped"); int32_t numOfVnodes = 0; - SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); + SVnodeObj **ppVnodes = NULL; + code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes); + if (code != 0) { + dError("failed to get vnode list since %s", tstrerror(code)); + return; + } int32_t threadNum = tsNumOfCores / 2; if (threadNum < 1) threadNum = 1; @@ -513,8 +525,14 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { } static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) { + int32_t code = 0; int32_t numOfVnodes = 0; - SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); + SVnodeObj **ppVnodes = NULL; + code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes); + if (code != 0) { + dError("failed to get vnode list since %s", tstrerror(code)); + return; + } if (ppVnodes != NULL) { for (int32_t i = 0; i < numOfVnodes; ++i) { @@ -549,12 +567,14 @@ static void *vmThreadFp(void *param) { } static int32_t vmInitTimer(SVnodeMgmt *pMgmt) { + int32_t code = 0; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) { - dError("failed to create vnode timer thread since %s", strerror(errno)); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create vnode timer thread since %s", tstrerror(code)); + return code; } taosThreadAttrDestroy(&thAttr); @@ -573,7 +593,10 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { int32_t code = -1; SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt)); - if (pMgmt == NULL) goto _OVER; + if (pMgmt == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } pMgmt->pData = pInput->pData; pMgmt->path = pInput->path; @@ -582,8 +605,18 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.mgmt = pMgmt; - taosThreadRwlockInit(&pMgmt->lock, NULL); - taosThreadMutexInit(&pMgmt->createLock, NULL); + + code = taosThreadRwlockInit(&pMgmt->lock, NULL); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } + + code = taosThreadMutexInit(&pMgmt->createLock, NULL); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } pMgmt->pTfs = pInput->pTfs; if (pMgmt->pTfs == NULL) { @@ -592,38 +625,39 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("vnode-tfs", "initialized"); - if (walInit() != 0) { - dError("failed to init wal since %s", terrstr()); + if ((code = walInit()) != 0) { + dError("failed to init wal since %s", tstrerror(code)); goto _OVER; } + tmsgReportStartup("vnode-wal", "initialized"); - if (syncInit() != 0) { - dError("failed to open sync since %s", terrstr()); + if ((code = syncInit()) != 0) { + dError("failed to open sync since %s", tstrerror(code)); goto _OVER; } tmsgReportStartup("vnode-sync", "initialized"); - if (vnodeInit(tsNumOfCommitThreads) != 0) { - dError("failed to init vnode since %s", terrstr()); + if ((code = vnodeInit(tsNumOfCommitThreads)) != 0) { + dError("failed to init vnode since %s", tstrerror(code)); goto _OVER; } tmsgReportStartup("vnode-commit", "initialized"); - if (vmStartWorker(pMgmt) != 0) { - dError("failed to init workers since %s", terrstr()); + if ((code = vmStartWorker(pMgmt)) != 0) { + dError("failed to init workers since %s", tstrerror(code)); goto _OVER; } tmsgReportStartup("vnode-worker", "initialized"); - if (vmOpenVnodes(pMgmt) != 0) { - dError("failed to open all vnodes since %s", terrstr()); + if ((code = vmOpenVnodes(pMgmt)) != 0) { + dError("failed to open all vnodes since %s", tstrerror(code)); goto _OVER; } tmsgReportStartup("vnode-vnodes", "initialized"); - if (udfcOpen() != 0) { - dError("failed to open udfc in vnode"); + if ((code = udfcOpen()) != 0) { + dError("failed to open udfc in vnode since %s", tstrerror(code)); goto _OVER; } @@ -633,7 +667,7 @@ _OVER: if (code == 0) { pOutput->pMgmt = pMgmt; } else { - dError("failed to init vnodes-mgmt since %s", terrstr()); + dError("failed to init vnodes-mgmt since %s", tstrerror(code)); vmCleanup(pMgmt); } @@ -683,18 +717,32 @@ static void *vmRestoreVnodeInThread(void *param) { } static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) { + int32_t code = 0; int32_t numOfVnodes = 0; - SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); + SVnodeObj **ppVnodes = NULL; + code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes); + if (code != 0) { + dError("failed to get vnode list since %s", tstrerror(code)); + return code; + } int32_t threadNum = tsNumOfCores / 2; if (threadNum < 1) threadNum = 1; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); + if (threads == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t t = 0; t < threadNum; ++t) { threads[t].threadIndex = t; threads[t].pMgmt = pMgmt; threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *)); + if (threads[t].ppVnodes == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } } for (int32_t v = 0; v < numOfVnodes; ++v) { @@ -717,6 +765,7 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) { taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) { dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno)); + ASSERT(errno == 0); } taosThreadAttrDestroy(&thAttr); @@ -742,6 +791,14 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) { } return vmInitTimer(pMgmt); + +_exit: + for (int32_t t = 0; t < threadNum; ++t) { + SVnodeThread *pThread = &threads[t]; + taosMemoryFree(pThread->ppVnodes); + } + taosMemoryFree(threads); + return code; } static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 0415340be9..c78e8289a9 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -200,26 +200,33 @@ static bool vmDataSpaceSufficient(SVnodeObj *pVnode) { } } +static int32_t vmAcquireVnodeWrapper(SVnodeMgmt *pMgt, int32_t vgId, SVnodeObj **pNode) { + *pNode = vmAcquireVnode(pMgt, vgId); + if (*pNode == NULL) { + return terrno; + } + return 0; +} static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { + int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; if (pMsg->contLen < sizeof(SMsgHead)) { dGError("invalid rpc msg with no msg head at pCont. pMsg:%p, type:%s, contLen:%d", pMsg, TMSG_INFO(pMsg->msgType), pMsg->contLen); - return -1; + return TSDB_CODE_INVALID_MSG; } SMsgHead *pHead = pMsg->pCont; - int32_t code = 0; pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); - if (pVnode == NULL) { + SVnodeObj *pVnode = NULL; + code = vmAcquireVnodeWrapper(pMgmt, pHead->vgId, &pVnode); + if (code != 0) { dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, - terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); - terrno = (terrno != 0) ? terrno : -1; - return terrno; + tstrerror(code), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); + return code; } switch (qtype) { @@ -234,49 +241,45 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp break; case STREAM_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pStreamQ, pMsg); + code = taosWriteQitem(pVnode->pStreamQ, pMsg); break; case FETCH_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pFetchQ, pMsg); + code = taosWriteQitem(pVnode->pFetchQ, pMsg); break; case WRITE_QUEUE: if (!vmDataSpaceSufficient(pVnode)) { - terrno = TSDB_CODE_NO_ENOUGH_DISKSPACE; - code = terrno; + code = TSDB_CODE_NO_ENOUGH_DISKSPACE; dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, tstrerror(code)); break; } if (pMsg->msgType == TDMT_VND_SUBMIT && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { - terrno = TSDB_CODE_VND_NO_WRITE_AUTH; - code = terrno; + code = TSDB_CODE_VND_NO_WRITE_AUTH; dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, tstrerror(code)); break; } if (pMsg->msgType != TDMT_VND_ALTER_CONFIRM && pVnode->disable) { dDebug("vgId:%d, msg:%p put into vnode-write queue failed since its disable", pVnode->vgId, pMsg); - terrno = TSDB_CODE_VND_STOPPED; - code = terrno; + code = TSDB_CODE_VND_STOPPED; break; } dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pWriteW.queue, pMsg); + code = taosWriteQitem(pVnode->pWriteW.queue, pMsg); break; case SYNC_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pSyncW.queue, pMsg); + code = taosWriteQitem(pVnode->pSyncW.queue, pMsg); break; case SYNC_RD_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-sync-rd queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pSyncRdW.queue, pMsg); + code = taosWriteQitem(pVnode->pSyncRdW.queue, pMsg); break; case APPLY_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pApplyW.queue, pMsg); + code = taosWriteQitem(pVnode->pApplyW.queue, pMsg); break; default: - code = -1; - terrno = TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_MSG; break; } @@ -299,32 +302,32 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMs int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); - taosWriteQitem(pMgmt->mgmtMultiWorker.queue, pMsg); - return 0; + return taosWriteQitem(pMgmt->mgmtMultiWorker.queue, pMsg); } int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, put into vnode-mgmt queue", pMsg); - taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg); - return 0; + return taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg); } int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { + int32_t code = 0; if (pRpc->contLen < sizeof(SMsgHead)) { dError("invalid rpc msg with no msg head at pCont. pRpc:%p, type:%s, len:%d", pRpc, TMSG_INFO(pRpc->msgType), pRpc->contLen); rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; - return -1; + return TSDB_CODE_INVALID_MSG; } EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM; - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); - if (pMsg == NULL) { + SRpcMsg *pMsg = NULL; + code = taosAllocateQitemWrapper(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg); + if (code != 0) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; - return -1; + return code; } SMsgHead *pHead = pRpc->pCont; @@ -335,7 +338,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; - int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype); + code = vmPutMsgToQueue(pMgmt, pMsg, qtype); if (code != 0) { dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); @@ -381,6 +384,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { } int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { + int32_t code = 0; SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl}; SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode}; SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode}; @@ -396,8 +400,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue, @@ -426,26 +429,27 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } int32_t vmStartWorker(SVnodeMgmt *pMgmt) { + int32_t code = 0; SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool; pQPool->name = "vnode-query"; pQPool->min = tsNumOfVnodeQueryThreads; pQPool->max = tsNumOfVnodeQueryThreads; - if (tQueryAutoQWorkerInit(pQPool) != 0) return -1; + if ((code = tQueryAutoQWorkerInit(pQPool)) != 0) return code; SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool; pStreamPool->name = "vnode-stream"; pStreamPool->ratio = tsRatioOfVnodeStreamThreads; - if (tAutoQWorkerInit(pStreamPool) != 0) return -1; + if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code; SWWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; pFPool->max = tsNumOfVnodeFetchThreads; - if (tWWorkerInit(pFPool) != 0) return -1; + if ((code = tWWorkerInit(pFPool)) != 0) return code; SSingleWorkerCfg mgmtCfg = { .min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; - if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1; + if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg)) != 0) return code; int32_t threadNum = 0; if (tsNumOfCores == 1) { @@ -459,7 +463,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { .fp = (FItem)vmProcessMultiMgmtQueue, .param = pMgmt}; - if (tSingleWorkerInit(&pMgmt->mgmtMultiWorker, &multiMgmtCfg) != 0) return -1; + if ((code = tSingleWorkerInit(&pMgmt->mgmtMultiWorker, &multiMgmtCfg)) != 0) return code; dDebug("vnode workers are initialized"); return 0; diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 15feaf1761..82809ad1a7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -14,11 +14,11 @@ */ #define _DEFAULT_SOURCE -#include "dmMgmt.h" #include "audit.h" +#include "dmMgmt.h" #include "libs/function/tudf.h" -#include "tgrant.h" #include "tcompare.h" +#include "tgrant.h" #define DM_INIT_AUDIT() \ do { \ @@ -256,8 +256,8 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { default: code = TSDB_CODE_APP_ERROR; } - dError("failed to create node since %s", terrstr()); - return terrno = code; + dError("failed to create node since %s", tstrerror(code)); + return code; } dInfo("start to process create-node-request"); @@ -267,7 +267,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { dmReleaseWrapper(pWrapper); code = TAOS_SYSTEM_ERROR(errno); dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code)); - return terrno = code; + return code; } taosThreadMutexLock(&pDnode->mutex); @@ -298,7 +298,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); if (pWrapper == NULL) { dError("fail to process alter node type since node not exist"); - return -1; + return TSDB_CODE_INVALID_MSG; } dmReleaseWrapper(pWrapper); @@ -312,7 +312,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { if (role == TAOS_SYNC_ROLE_VOTER) { dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role); code = TSDB_CODE_MNODE_ALREADY_IS_VOTER; - return terrno = code; + return code; } } @@ -320,7 +320,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { dInfo("node:%s, checking node catch up", pWrapper->name); if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) { code = TSDB_CODE_MNODE_NOT_CATCH_UP; - return terrno = code; + return code; } } @@ -338,7 +338,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { taosThreadMutexUnlock(&pDnode->mutex); code = TAOS_SYSTEM_ERROR(errno); dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code)); - return terrno = code; + return code; } SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); @@ -381,7 +381,7 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { code = TSDB_CODE_APP_ERROR; } - dError("failed to drop node since %s", terrstr()); + dError("failed to drop node since %s", tstrerror(code)); return terrno = code; } @@ -439,6 +439,4 @@ void dmReportStartup(const char *pName, const char *pDesc) { int64_t dmGetClusterId() { return globalDnode.data.clusterId; } -bool dmReadyForTest() { - return dmInstance()->data.dnodeVer > 0; -} +bool dmReadyForTest() { return dmInstance()->data.dnodeVer > 0; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 2e0533d91a..f4be160965 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -305,7 +305,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { } taosThreadRwlockUnlock(&pWrapper->lock); - return terrno = code; + return code; } void dmReleaseWrapper(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index a8bf5be3e2..7ce16f40ad 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -17,12 +17,13 @@ #include "dmMgmt.h" int32_t dmOpenNode(SMgmtWrapper *pWrapper) { + int32_t code = 0; SDnode *pDnode = pWrapper->pDnode; if (taosMkDir(pWrapper->path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, tstrerror(code)); + return terrno = code; } SMgmtOutputOpt output = {0}; @@ -30,7 +31,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { dInfo("node:%s, start to open", pWrapper->name); tmsgSetDefault(&input.msgCb); - if ((*pWrapper->func.openFp)(&input, &output) != 0) { + if ((code = (*pWrapper->func.openFp)(&input, &output)) != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); return -1; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 2f5c4abd0b..c8941c8dc6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -39,9 +39,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; if (msgFp == NULL) { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; + // terrno = TSDB_CODE_MSG_NOT_PROCESSED; dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - return -1; + return TSDB_CODE_MSG_NOT_PROCESSED; } dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name); @@ -54,14 +54,15 @@ static bool dmFailFastFp(tmsg_t msgType) { return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES; } -static void dmConvertErrCode(tmsg_t msgType) { - if (terrno != TSDB_CODE_APP_IS_STOPPING) { - return; +static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) { + if (code != TSDB_CODE_APP_IS_STOPPING) { + return code; } if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) || (msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) { - terrno = TSDB_CODE_VND_STOPPED; + code = TSDB_CODE_VND_STOPPED; } + return code; } static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) { SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite)); @@ -99,14 +100,14 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; taosVersionStrToInt(version, &svrVer); - if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) { + if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) { dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer); goto _OVER; } bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp); if (isForbidden) { - terrno = TSDB_CODE_IP_NOT_IN_WHITE_LIST; + code = TSDB_CODE_IP_NOT_IN_WHITE_LIST; goto _OVER; } @@ -119,7 +120,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { case TDMT_SCH_FETCH_RSP: case TDMT_SCH_MERGE_FETCH_RSP: case TDMT_VND_SUBMIT_RSP: - qWorkerProcessRspMsg(NULL, NULL, pRpc, 0); + code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0); return; case TDMT_MND_STATUS_RSP: if (pEpSet != NULL) { @@ -148,32 +149,32 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { return; } else { if (pDnode->status == DND_STAT_INIT) { - terrno = TSDB_CODE_APP_IS_STARTING; + code = TSDB_CODE_APP_IS_STARTING; } else { - terrno = TSDB_CODE_APP_IS_STOPPING; + code = TSDB_CODE_APP_IS_STOPPING; } goto _OVER; } } } else { - terrno = TSDB_CODE_APP_IS_STARTING; + code = TSDB_CODE_APP_IS_STARTING; goto _OVER; } if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) { dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType)); - terrno = TSDB_CODE_INVALID_MSG_LEN; + code = TSDB_CODE_INVALID_MSG_LEN; goto _OVER; } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) && (!IsReq(pRpc)) && (pRpc->pCont == NULL)) { dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code)); - terrno = pRpc->code; + code = pRpc->code; goto _OVER; } if (pHandle->defaultNtype == NODE_END) { dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType)); - terrno = TSDB_CODE_MSG_NOT_PROCESSED; + code = TSDB_CODE_MSG_NOT_PROCESSED; goto _OVER; } @@ -197,12 +198,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } } else { dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType)); - terrno = TSDB_CODE_INVALID_MSG_LEN; + code = TSDB_CODE_INVALID_MSG_LEN; goto _OVER; } } - if (dmMarkWrapper(pWrapper) != 0) { + if ((code = dmMarkWrapper(pWrapper)) != 0) { pWrapper = NULL; goto _OVER; } @@ -210,8 +211,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { pRpc->info.wrapper = pWrapper; EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed - pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); - if (pMsg == NULL) goto _OVER; + code = taosAllocateQitemWrapper(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg); + if (code != 0) { + goto _OVER; + } memcpy(pMsg, pRpc, sizeof(SRpcMsg)); dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, @@ -221,10 +224,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { _OVER: if (code != 0) { - dmConvertErrCode(pRpc->msgType); - if (terrno != 0) code = terrno; + code = dmConvertErrCode(pRpc->msgType, code); if (pMsg) { - dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); + dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code)); } else { dGTrace("msg:%p, failed to process empty msg since %s", pMsg, terrstr()); } @@ -280,17 +282,19 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { } static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t code = 0; SDnode *pDnode = dmInstance(); if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; if (pDnode->status == DND_STAT_INIT) { - terrno = TSDB_CODE_APP_IS_STARTING; + code = TSDB_CODE_APP_IS_STARTING; } else { - terrno = TSDB_CODE_APP_IS_STOPPING; + code = TSDB_CODE_APP_IS_STOPPING; } - dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle); - return -1; + dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code), + pMsg->info.handle); + return code; } else { pMsg->info.handle = 0; rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL); @@ -521,7 +525,7 @@ int32_t dmInitServer(SDnode *pDnode) { pTrans->serverRpc = rpcOpen(&rpcInit); if (pTrans->serverRpc == NULL) { dError("failed to init dnode rpc server"); - return -1; + return terrno; } dDebug("dnode rpc server is initialized"); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 723e3701a1..0c510ec742 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -464,26 +464,23 @@ static int32_t mndCreateDir(SMnode *pMnode, const char *path) { static int32_t mndInitWal(SMnode *pMnode) { char path[PATH_MAX + 20] = {0}; snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); - SWalCfg cfg = { - .vgId = 1, - .fsyncPeriod = 0, - .rollPeriod = -1, - .segSize = -1, - .retentionPeriod = 0, - .retentionSize = 0, - .level = TAOS_WAL_FSYNC, - .encryptAlgorithm = 0, - .encryptKey = {0} - }; + SWalCfg cfg = {.vgId = 1, + .fsyncPeriod = 0, + .rollPeriod = -1, + .segSize = -1, + .retentionPeriod = 0, + .retentionSize = 0, + .level = TAOS_WAL_FSYNC, + .encryptAlgorithm = 0, + .encryptKey = {0}}; #if defined(TD_ENTERPRISE) - if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_MNODE_WAL) == DND_CS_MNODE_WAL){ - cfg.encryptAlgorithm = (tsiEncryptScope & DND_CS_MNODE_WAL)? tsiEncryptAlgorithm : 0; - if(tsEncryptKey[0] == '\0'){ + if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_MNODE_WAL) == DND_CS_MNODE_WAL) { + cfg.encryptAlgorithm = (tsiEncryptScope & DND_CS_MNODE_WAL) ? tsiEncryptAlgorithm : 0; + if (tsEncryptKey[0] == '\0') { terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; return -1; - } - else{ + } else { strncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); } } @@ -856,7 +853,7 @@ _OVER: return -1; } -int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo) { +int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo *pQueueInfo) { SMnode *pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 45a8a462fb..1a7f517bbf 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -146,6 +146,15 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { return memOfItems; } +int32_t taosAllocateQitemWrapper(int32_t size, EQItype itype, int64_t dataSize, void **pItem) { + int32_t code = 0; + *pItem = taosAllocateQitem(size, itype, dataSize); + if (*pItem == NULL) { + return terrno; + } + + return code; +} void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) { STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); if (pNode == NULL) { @@ -533,9 +542,7 @@ int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfI void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -void taosQueueSetThreadId(STaosQueue* pQueue, int64_t threadId) { - pQueue->threadId = threadId; -} +void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; } int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; } diff --git a/source/util/src/tversion.c b/source/util/src/tversion.c index 0f84f6bafa..b44262d3c5 100644 --- a/source/util/src/tversion.c +++ b/source/util/src/tversion.c @@ -83,15 +83,13 @@ int32_t taosCheckVersionCompatible(int32_t clientVer, int32_t serverVer, int32_t serverVer /= 1000000; break; default: - terrno = TSDB_CODE_INVALID_VERSION_NUMBER; - return -1; + return TSDB_CODE_INVALID_VERSION_NUMBER; } if (clientVer == serverVer) { return 0; } else { - terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE; - return -1; + return TSDB_CODE_VERSION_NOT_COMPATIBLE; } }