refactor error code
This commit is contained in:
parent
87e114a2d0
commit
640eada5b1
|
@ -77,6 +77,7 @@ STaosQueue *taosOpenQueue();
|
|||
void taosCloseQueue(STaosQueue *queue);
|
||||
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
||||
void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize);
|
||||
int32_t taosAllocateQitemWrapper(int32_t size, EQItype itype, int64_t dataSize, void **pItem);
|
||||
void taosFreeQitem(void *pItem);
|
||||
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||
|
|
|
@ -45,7 +45,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
|
|||
|
||||
SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
|
||||
int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
|
||||
void * pHead = rpcMallocCont(contLen);
|
||||
void *pHead = rpcMallocCont(contLen);
|
||||
tSerializeRetrieveIpWhite(pHead, contLen, &req);
|
||||
|
||||
SRpcMsg rpcMsg = {.pCont = pHead,
|
||||
|
@ -116,7 +116,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
|||
req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite;
|
||||
req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
|
||||
req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
|
||||
req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
|
||||
req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
|
||||
req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
|
||||
req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
|
||||
req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
|
||||
|
@ -146,7 +146,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
|||
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
|
||||
|
||||
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
||||
void * pHead = rpcMallocCont(contLen);
|
||||
void *pHead = rpcMallocCont(contLen);
|
||||
tSerializeSStatusReq(pHead, contLen, &req);
|
||||
tFreeSStatusReq(&req);
|
||||
|
||||
|
@ -207,18 +207,22 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SDCfgDnodeReq cfgReq = {0};
|
||||
if (tDeserializeSDCfgDnodeReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
|
||||
|
||||
SConfig *pCfg = taosGetCfg();
|
||||
cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD, true);
|
||||
taosCfgDynamicOptions(pCfg, cfgReq.config, true);
|
||||
return 0;
|
||||
|
||||
code = cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD, true);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return taosCfgDynamicOptions(pCfg, cfgReq.config, true);
|
||||
}
|
||||
|
||||
int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
|
@ -251,7 +255,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
|
|||
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
|
||||
pStatus->details[0] = 0;
|
||||
|
||||
SMonMloadInfo minfo = {0};
|
||||
SMonMloadInfo minfo = {0};
|
||||
(*pMgmt->getMnodeLoadsFp)(&minfo);
|
||||
if (minfo.isMnode &&
|
||||
(minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
|
||||
|
@ -276,6 +280,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
|
|||
}
|
||||
|
||||
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
dDebug("server run status req is received");
|
||||
SServerStatusRsp statusRsp = {0};
|
||||
dmGetServerRunStatus(pMgmt, &statusRsp);
|
||||
|
@ -284,24 +289,37 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
|
||||
if (rspLen < 0) {
|
||||
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return rspMsg.code;
|
||||
}
|
||||
|
||||
void *pRsp = rpcMallocCont(rspLen);
|
||||
if (pRsp == NULL) {
|
||||
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return rspMsg.code;
|
||||
}
|
||||
|
||||
code = tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(pRsp);
|
||||
rspMsg.code = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
|
||||
pMsg->info.rsp = pRsp;
|
||||
pMsg->info.rspLen = rspLen;
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSDataBlock *dmBuildVariablesBlock(void) {
|
||||
SSDataBlock * pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
size_t size = 0;
|
||||
int32_t dmBuildVariablesBlock(SSDataBlock **ppBlock) {
|
||||
int32_t code = 0;
|
||||
|
||||
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
if (pBlock == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
size_t size = 0;
|
||||
|
||||
const SSysTableMeta *pMeta = NULL;
|
||||
getInfosDbMeta(&pMeta, &size);
|
||||
|
||||
|
@ -314,52 +332,74 @@ SSDataBlock *dmBuildVariablesBlock(void) {
|
|||
}
|
||||
|
||||
pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
|
||||
if (pBlock->pDataBlock == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
|
||||
SColumnInfoData colInfoData = {0};
|
||||
colInfoData.info.colId = i + 1;
|
||||
colInfoData.info.type = pMeta[index].schema[i].type;
|
||||
colInfoData.info.bytes = pMeta[index].schema[i].bytes;
|
||||
taosArrayPush(pBlock->pDataBlock, &colInfoData);
|
||||
if (taosArrayPush(pBlock->pDataBlock, &colInfoData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->info.hasVarCol = true;
|
||||
|
||||
return pBlock;
|
||||
_exit:
|
||||
if (code != 0) {
|
||||
blockDataDestroy(pBlock);
|
||||
} else {
|
||||
*ppBlock = pBlock;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
|
||||
/*int32_t code = */dumpConfToDataBlock(pBlock, 1);
|
||||
int32_t code = dumpConfToDataBlock(pBlock, 1);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
|
||||
if (pColInfo == NULL) {
|
||||
return TSDB_CODE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
|
||||
}
|
||||
|
||||
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
int32_t size = 0;
|
||||
int32_t rowsRead = 0;
|
||||
|
||||
int32_t size = 0;
|
||||
int32_t rowsRead = 0;
|
||||
int32_t code = 0;
|
||||
SRetrieveTableReq retrieveReq = {0};
|
||||
if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
#if 0
|
||||
if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
|
||||
terrno = TSDB_CODE_MND_NO_RIGHTS;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_NO_RIGHTS;
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
SSDataBlock *pBlock = dmBuildVariablesBlock();
|
||||
SSDataBlock *pBlock = NULL;
|
||||
if ((code = dmBuildVariablesBlock(&pBlock)) != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
|
||||
code = dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
|
||||
if (code != 0) {
|
||||
blockDataDestroy(pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols +
|
||||
|
@ -367,10 +407,10 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
|
||||
if (pRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
dError("failed to retrieve data since %s", terrstr());
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
dError("failed to retrieve data since %s", tstrerror(code));
|
||||
blockDataDestroy(pBlock);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
char *pStart = pRsp->data;
|
||||
|
@ -404,7 +444,9 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
SArray *dmGetMsgHandles() {
|
||||
int32_t code = -1;
|
||||
SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
|
||||
if (pArray == NULL) goto _OVER;
|
||||
if (pArray == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Requests handled by DNODE
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -416,7 +458,7 @@ SArray *dmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_ENCRYPT_KEY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
// Requests handled by MNODE
|
||||
|
|
|
@ -18,22 +18,23 @@
|
|||
#include "libs/function/tudf.h"
|
||||
|
||||
static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
|
||||
if (dmStartStatusThread(pMgmt) != 0) {
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
if ((code = dmStartStatusThread(pMgmt)) != 0) {
|
||||
return code;
|
||||
}
|
||||
#if defined(TD_ENTERPRISE)
|
||||
if (dmStartNotifyThread(pMgmt) != 0) {
|
||||
return -1;
|
||||
if ((code = dmStartNotifyThread(pMgmt)) != 0) {
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
if (dmStartMonitorThread(pMgmt) != 0) {
|
||||
return -1;
|
||||
if ((code = dmStartMonitorThread(pMgmt)) != 0) {
|
||||
return code;
|
||||
}
|
||||
if (dmStartAuditThread(pMgmt) != 0) {
|
||||
return -1;
|
||||
if ((code = dmStartAuditThread(pMgmt)) != 0) {
|
||||
return code;
|
||||
}
|
||||
if (dmStartCrashReportThread(pMgmt) != 0) {
|
||||
return -1;
|
||||
if ((code = dmStartCrashReportThread(pMgmt)) != 0) {
|
||||
return code;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -50,10 +51,10 @@ static void dmStopMgmt(SDnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||
int32_t code = 0;
|
||||
SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt));
|
||||
if (pMgmt == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pMgmt->pData = pInput->pData;
|
||||
|
@ -70,12 +71,11 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
|
||||
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
|
||||
|
||||
// pMgmt->pData->ipWhiteVer = 0;
|
||||
if (dmStartWorker(pMgmt) != 0) {
|
||||
return -1;
|
||||
if ((code = dmStartWorker(pMgmt)) != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (udfStartUdfd(pMgmt->pData->dnodeId) != 0) {
|
||||
if ((code = udfStartUdfd(pMgmt->pData->dnodeId)) != 0) {
|
||||
dError("failed to start udfd");
|
||||
}
|
||||
|
||||
|
|
|
@ -264,12 +264,14 @@ static void *dmCrashReportThreadFp(void *param) {
|
|||
}
|
||||
|
||||
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
|
||||
dError("failed to create status thread since %s", strerror(errno));
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create status thread since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
|
@ -285,12 +287,14 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
|
||||
dError("failed to create notify thread since %s", strerror(errno));
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create notify thread since %s", strerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
|
@ -308,12 +312,14 @@ void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
|
||||
dError("failed to create monitor thread since %s", strerror(errno));
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create monitor thread since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
|
@ -322,12 +328,14 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
|
||||
dError("failed to create audit thread since %s", strerror(errno));
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create audit thread since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
|
@ -350,6 +358,7 @@ void dmStopAuditThread(SDnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
if (!tsEnableCrashReport) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -358,8 +367,9 @@ int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
|
|||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
|
||||
dError("failed to create crashReport thread since %s", strerror(errno));
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create crashReport thread since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
|
@ -431,8 +441,8 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
code = dmProcessCreateEncryptKeyReq(pMgmt, pMsg);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
dGError("msg:%p, not processed in mgmt queue", pMsg);
|
||||
code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
dGError("msg:%p, not processed in mgmt queue, reason:%s", pMsg, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,31 +21,31 @@ static int32_t mmDecodeOption(SJson *pJson, SMnodeOpt *pOption) {
|
|||
int32_t code = 0;
|
||||
|
||||
tjsonGetInt32ValueFromDouble(pJson, "deployed", pOption->deploy, code);
|
||||
if (code < 0) return -1;
|
||||
if (code < 0) return code;
|
||||
tjsonGetInt32ValueFromDouble(pJson, "selfIndex", pOption->selfIndex, code);
|
||||
if (code < 0) return 0;
|
||||
if (code < 0) return code;
|
||||
tjsonGetInt32ValueFromDouble(pJson, "lastIndex", pOption->lastIndex, code);
|
||||
if (code < 0) return 0;
|
||||
if (code < 0) return code;
|
||||
|
||||
SJson *replicas = tjsonGetObjectItem(pJson, "replicas");
|
||||
if (replicas == NULL) return 0;
|
||||
if (replicas == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
pOption->numOfTotalReplicas = tjsonGetArraySize(replicas);
|
||||
|
||||
pOption->numOfReplicas = 0;
|
||||
|
||||
for (int32_t i = 0; i < pOption->numOfTotalReplicas; ++i) {
|
||||
SJson *replica = tjsonGetArrayItem(replicas, i);
|
||||
if (replica == NULL) return -1;
|
||||
if (replica == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
|
||||
SReplica *pReplica = pOption->replicas + i;
|
||||
tjsonGetInt32ValueFromDouble(replica, "id", pReplica->id, code);
|
||||
if (code < 0) return -1;
|
||||
if (code < 0) return code;
|
||||
code = tjsonGetStringValue(replica, "fqdn", pReplica->fqdn);
|
||||
if (code < 0) return -1;
|
||||
if (code < 0) return code;
|
||||
tjsonGetUInt16ValueFromDouble(replica, "port", pReplica->port, code);
|
||||
if (code < 0) return -1;
|
||||
if (code < 0) return code;
|
||||
tjsonGetInt32ValueFromDouble(replica, "role", pOption->nodeRoles[i], code);
|
||||
if (code < 0) return -1;
|
||||
if (code < 0) return code;
|
||||
if (pOption->nodeRoles[i] == TAOS_SYNC_ROLE_VOTER) {
|
||||
pOption->numOfReplicas++;
|
||||
}
|
||||
|
@ -63,36 +63,41 @@ int32_t mmReadFile(const char *path, SMnodeOpt *pOption) {
|
|||
char *pData = NULL;
|
||||
SJson *pJson = NULL;
|
||||
char file[PATH_MAX] = {0};
|
||||
snprintf(file, sizeof(file), "%s%smnode.json", path, TD_DIRSEP);
|
||||
|
||||
int32_t nBytes = snprintf(file, sizeof(file), "%s%smnode.json", path, TD_DIRSEP);
|
||||
if (nBytes <= 0 || nBytes >= sizeof(file)) {
|
||||
code = TSDB_CODE_OUT_OF_BUFFER;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (taosStatFile(file, NULL, NULL, NULL) < 0) {
|
||||
dInfo("mnode file:%s not exist", file);
|
||||
dInfo("mnode file:%s not exist, reason:%s", file, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
return 0;
|
||||
}
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to open mnode file:%s since %s", file, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to open mnode file:%s since %s", file, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
int64_t size = 0;
|
||||
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to fstat mnode file:%s since %s", file, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to fstat mnode file:%s since %s", file, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pData = taosMemoryMalloc(size + 1);
|
||||
if (pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (taosReadFile(pFile, pData, size) != size) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to read mnode file:%s since %s", file, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to read mnode file:%s since %s", file, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -100,12 +105,11 @@ int32_t mmReadFile(const char *path, SMnodeOpt *pOption) {
|
|||
|
||||
pJson = tjsonParse(pData);
|
||||
if (pJson == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
code = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mmDecodeOption(pJson, pOption) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
if ((code = mmDecodeOption(pJson, pOption)) < 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -118,37 +122,42 @@ _OVER:
|
|||
if (pFile != NULL) taosCloseFile(&pFile);
|
||||
|
||||
if (code != 0) {
|
||||
dError("failed to read mnode file:%s since %s", file, terrstr());
|
||||
dError("failed to read mnode file:%s since %s", file, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mmEncodeOption(SJson *pJson, const SMnodeOpt *pOption) {
|
||||
int32_t code = 0;
|
||||
if (pOption->deploy && pOption->numOfTotalReplicas > 0) {
|
||||
if (tjsonAddDoubleToObject(pJson, "selfIndex", pOption->selfIndex) < 0) return -1;
|
||||
if ((code = tjsonAddDoubleToObject(pJson, "selfIndex", pOption->selfIndex)) < 0) return code;
|
||||
|
||||
SJson *replicas = tjsonCreateArray();
|
||||
if (replicas == NULL) return -1;
|
||||
if (tjsonAddItemToObject(pJson, "replicas", replicas) < 0) return -1;
|
||||
if (replicas == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if ((code = tjsonAddItemToObject(pJson, "replicas", replicas)) < 0) return code;
|
||||
|
||||
for (int32_t i = 0; i < pOption->numOfTotalReplicas; ++i) {
|
||||
SJson *replica = tjsonCreateObject();
|
||||
if (replica == NULL) return -1;
|
||||
if (replica == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
const SReplica *pReplica = pOption->replicas + i;
|
||||
if (tjsonAddDoubleToObject(replica, "id", pReplica->id) < 0) return -1;
|
||||
if (tjsonAddStringToObject(replica, "fqdn", pReplica->fqdn) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(replica, "port", pReplica->port) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(replica, "role", pOption->nodeRoles[i]) < 0) return -1;
|
||||
if (tjsonAddItemToArray(replicas, replica) < 0) return -1;
|
||||
if ((code = tjsonAddDoubleToObject(replica, "id", pReplica->id)) < 0) return code;
|
||||
if ((code = tjsonAddStringToObject(replica, "fqdn", pReplica->fqdn)) < 0) return code;
|
||||
if ((code = tjsonAddDoubleToObject(replica, "port", pReplica->port)) < 0) return code;
|
||||
if ((code = tjsonAddDoubleToObject(replica, "role", pOption->nodeRoles[i])) < 0) return code;
|
||||
if ((code = tjsonAddItemToArray(replicas, replica)) < 0) return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (tjsonAddDoubleToObject(pJson, "lastIndex", pOption->lastIndex) < 0) return -1;
|
||||
if ((code = tjsonAddDoubleToObject(pJson, "lastIndex", pOption->lastIndex)) < 0) return code;
|
||||
|
||||
if (tjsonAddDoubleToObject(pJson, "deployed", pOption->deploy) < 0) return -1;
|
||||
if ((code = tjsonAddDoubleToObject(pJson, "deployed", pOption->deploy)) < 0) return code;
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) {
|
||||
|
@ -158,28 +167,59 @@ int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) {
|
|||
TdFilePtr pFile = NULL;
|
||||
char file[PATH_MAX] = {0};
|
||||
char realfile[PATH_MAX] = {0};
|
||||
snprintf(file, sizeof(file), "%s%smnode.json.bak", path, TD_DIRSEP);
|
||||
snprintf(realfile, sizeof(realfile), "%s%smnode.json", path, TD_DIRSEP);
|
||||
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
int32_t nBytes = snprintf(file, sizeof(file), "%s%smnode.json.bak", path, TD_DIRSEP);
|
||||
if (nBytes <= 0 || nBytes >= sizeof(file)) {
|
||||
code = TSDB_CODE_OUT_OF_BUFFER;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
nBytes = snprintf(realfile, sizeof(realfile), "%s%smnode.json", path, TD_DIRSEP);
|
||||
if (nBytes <= 0 || nBytes >= sizeof(realfile)) {
|
||||
code = TSDB_CODE_OUT_OF_BUFFER;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pJson = tjsonCreateObject();
|
||||
if (pJson == NULL) goto _OVER;
|
||||
if (mmEncodeOption(pJson, pOption) != 0) goto _OVER;
|
||||
if (pJson == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(mmEncodeOption(pJson, pOption), NULL, _OVER);
|
||||
|
||||
buffer = tjsonToString(pJson);
|
||||
if (buffer == NULL) goto _OVER;
|
||||
terrno = 0;
|
||||
if (buffer == NULL) {
|
||||
code = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||
if (pFile == NULL) goto _OVER;
|
||||
if (pFile == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
int32_t len = strlen(buffer);
|
||||
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
|
||||
if (taosFsyncFile(pFile) < 0) goto _OVER;
|
||||
if (taosWriteFile(pFile, buffer, len) <= 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
if (taosFsyncFile(pFile) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
taosCloseFile(&pFile);
|
||||
if (taosRenameFile(file, realfile) != 0) goto _OVER;
|
||||
if (taosCloseFile(&pFile) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
if (taosRenameFile(file, realfile) != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
dInfo("succeed to write mnode file:%s, deloyed:%d", realfile, pOption->deploy);
|
||||
|
||||
_OVER:
|
||||
|
@ -188,8 +228,7 @@ _OVER:
|
|||
if (pFile != NULL) taosCloseFile(&pFile);
|
||||
|
||||
if (code != 0) {
|
||||
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to write mnode file:%s since %s, deloyed:%d", realfile, terrstr(), pOption->deploy);
|
||||
dError("failed to write mnode file:%s since %s, deloyed:%d", realfile, tstrerror(code), pOption->deploy);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -26,11 +26,12 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
|||
}
|
||||
|
||||
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
SDCreateMnodeReq createReq = {0};
|
||||
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
SMnodeOpt option = {.deploy = true,
|
||||
|
@ -56,43 +57,45 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
if (option.selfIndex == -1) {
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
dGError("failed to create mnode since %s, selfIndex is -1", terrstr());
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_OPTION;
|
||||
dGError("failed to create mnode since %s, selfIndex is -1", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (mmWriteFile(pInput->path, &option) != 0) {
|
||||
dGError("failed to write mnode file since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = mmWriteFile(pInput->path, &option)) != 0) {
|
||||
dGError("failed to write mnode file since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
SDDropMnodeReq dropReq = {0};
|
||||
if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
dGError("failed to drop mnode since %s", terrstr());
|
||||
code = TSDB_CODE_INVALID_OPTION;
|
||||
dGError("failed to drop mnode since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
SMnodeOpt option = {.deploy = false};
|
||||
if (mmWriteFile(pInput->path, &option) != 0) {
|
||||
dGError("failed to write mnode file since %s", terrstr());
|
||||
if ((code = mmWriteFile(pInput->path, &option)) != 0) {
|
||||
dGError("failed to write mnode file since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
SArray *mmGetMsgHandles() {
|
||||
|
|
|
@ -25,9 +25,10 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
|
|||
}
|
||||
|
||||
static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
|
||||
int32_t code = 0;
|
||||
SMnodeOpt option = {0};
|
||||
if (mmReadFile(pInput->path, &option) != 0) {
|
||||
return -1;
|
||||
if ((code = mmReadFile(pInput->path, &option)) != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!option.deploy) {
|
||||
|
@ -41,7 +42,7 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
|
|||
dInfo("deploy mnode required. option deploy:%d", option.deploy);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) {
|
||||
|
@ -73,22 +74,31 @@ static void mmClose(SMnodeMgmt *pMgmt) {
|
|||
|
||||
taosMemoryFree(pMgmt);
|
||||
}
|
||||
|
||||
static int32_t mndOpenWrapper(const char *path, SMnodeOpt *opt, SMnode **pMnode) {
|
||||
int32_t code = 0;
|
||||
*pMnode = mndOpen(path, opt);
|
||||
if (*pMnode == NULL) {
|
||||
code = terrno;
|
||||
}
|
||||
///*pMnode = pNode;
|
||||
return code;
|
||||
}
|
||||
static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||
if (walInit() != 0) {
|
||||
dError("failed to init wal since %s", terrstr());
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
if ((code = walInit()) != 0) {
|
||||
dError("failed to init wal since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (syncInit() != 0) {
|
||||
dError("failed to init sync since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = syncInit()) != 0) {
|
||||
dError("failed to init sync since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt));
|
||||
if (pMgmt == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
pMgmt->pData = pInput->pData;
|
||||
|
@ -100,10 +110,10 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
taosThreadRwlockInit(&pMgmt->lock, NULL);
|
||||
|
||||
SMnodeOpt option = {0};
|
||||
if (mmReadFile(pMgmt->path, &option) != 0) {
|
||||
dError("failed to read file since %s", terrstr());
|
||||
if ((code = mmReadFile(pMgmt->path, &option)) != 0) {
|
||||
dError("failed to read file since %s", tstrerror(code));
|
||||
mmClose(pMgmt);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!option.deploy) {
|
||||
|
@ -115,18 +125,18 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
mmBuildOptionForOpen(pMgmt, &option);
|
||||
}
|
||||
|
||||
pMgmt->pMnode = mndOpen(pMgmt->path, &option);
|
||||
if (pMgmt->pMnode == NULL) {
|
||||
dError("failed to open mnode since %s", terrstr());
|
||||
code = mndOpenWrapper(pMgmt->path, &option, &pMgmt->pMnode);
|
||||
if (code != 0) {
|
||||
dError("failed to open mnode since %s", tstrerror(code));
|
||||
mmClose(pMgmt);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
tmsgReportStartup("mnode-impl", "initialized");
|
||||
|
||||
if (mmStartWorker(pMgmt) != 0) {
|
||||
dError("failed to start mnode worker since %s", terrstr());
|
||||
if ((code = mmStartWorker(pMgmt)) != 0) {
|
||||
dError("failed to start mnode worker since %s", tstrerror(code));
|
||||
mmClose(pMgmt);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
tmsgReportStartup("mnode-worker", "initialized");
|
||||
|
||||
|
@ -134,9 +144,9 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
option.deploy = true;
|
||||
option.numOfReplicas = 0;
|
||||
option.numOfTotalReplicas = 0;
|
||||
if (mmWriteFile(pMgmt->path, &option) != 0) {
|
||||
dError("failed to write mnode file since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = mmWriteFile(pMgmt->path, &option)) != 0) {
|
||||
dError("failed to write mnode file since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,13 +170,9 @@ static void mmStop(SMnodeMgmt *pMgmt) {
|
|||
mndStop(pMgmt->pMnode);
|
||||
}
|
||||
|
||||
static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) {
|
||||
return mndIsCatchUp(pMgmt->pMnode);
|
||||
}
|
||||
static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) { return mndIsCatchUp(pMgmt->pMnode); }
|
||||
|
||||
static ESyncRole mmSyncGetRole(SMnodeMgmt *pMgmt) {
|
||||
return mndGetRole(pMgmt->pMnode);
|
||||
}
|
||||
static ESyncRole mmSyncGetRole(SMnodeMgmt *pMgmt) { return mndGetRole(pMgmt->pMnode); }
|
||||
|
||||
SMgmtFunc mmGetMgmtFunc() {
|
||||
SMgmtFunc mgmtFunc = {0};
|
||||
|
|
|
@ -186,7 +186,8 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
pRpc->pCont = NULL;
|
||||
|
||||
dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), pRpc->contLen);
|
||||
dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType),
|
||||
pRpc->contLen);
|
||||
int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
|
||||
if (code != 0) {
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
|
@ -197,6 +198,7 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
}
|
||||
|
||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
SSingleWorkerCfg qCfg = {
|
||||
.min = tsNumOfMnodeQueryThreads,
|
||||
.max = tsNumOfMnodeQueryThreads,
|
||||
|
@ -205,9 +207,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.param = pMgmt,
|
||||
.poolType = QUERY_AUTO_QWORKER_POOL,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
|
||||
dError("failed to start mnode-query worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &qCfg)) != 0) {
|
||||
dError("failed to start mnode-query worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg fCfg = {
|
||||
|
@ -217,9 +219,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.fp = (FItem)mmProcessRpcMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg) != 0) {
|
||||
dError("failed to start mnode-fetch worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fCfg)) != 0) {
|
||||
dError("failed to start mnode-fetch worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg rCfg = {
|
||||
|
@ -229,9 +231,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.fp = (FItem)mmProcessRpcMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
|
||||
dError("failed to start mnode-read worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->readWorker, &rCfg)) != 0) {
|
||||
dError("failed to start mnode-read worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg wCfg = {
|
||||
|
@ -241,9 +243,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.fp = (FItem)mmProcessRpcMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
|
||||
dError("failed to start mnode-write worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->writeWorker, &wCfg)) != 0) {
|
||||
dError("failed to start mnode-write worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg sCfg = {
|
||||
|
@ -253,9 +255,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.fp = (FItem)mmProcessSyncMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
|
||||
dError("failed to start mnode mnode-sync worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->syncWorker, &sCfg)) != 0) {
|
||||
dError("failed to start mnode mnode-sync worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg scCfg = {
|
||||
|
@ -265,9 +267,9 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.fp = (FItem)mmProcessSyncMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg) != 0) {
|
||||
dError("failed to start mnode mnode-sync-rd worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg)) != 0) {
|
||||
dError("failed to start mnode mnode-sync-rd worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg arbCfg = {
|
||||
|
@ -277,13 +279,13 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.fp = (FItem)mmProcessRpcMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg) != 0) {
|
||||
dError("failed to start mnode mnode-arb worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->arbWorker, &arbCfg)) != 0) {
|
||||
dError("failed to start mnode mnode-arb worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
dDebug("mnode workers are initialized");
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
void mmStopWorker(SMnodeMgmt *pMgmt) {
|
||||
|
|
|
@ -30,24 +30,25 @@ void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) {
|
|||
}
|
||||
|
||||
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SDCreateQnodeReq createReq = {0};
|
||||
if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to create qnode since %s", terrstr());
|
||||
code = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to create qnode since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&createReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
bool deployed = true;
|
||||
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
|
||||
dError("failed to write qnode file since %s", terrstr());
|
||||
if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) {
|
||||
dError("failed to write qnode file since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&createReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
tFreeSMCreateQnodeReq(&createReq);
|
||||
|
@ -55,24 +56,25 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SDDropQnodeReq dropReq = {0};
|
||||
if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to drop qnode since %s", terrstr());
|
||||
code = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to drop qnode since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
bool deployed = false;
|
||||
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
|
||||
dError("failed to write qnode file since %s", terrstr());
|
||||
if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) {
|
||||
dError("failed to write qnode file since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
|
|
|
@ -33,11 +33,18 @@ static void qmClose(SQnodeMgmt *pMgmt) {
|
|||
taosMemoryFree(pMgmt);
|
||||
}
|
||||
|
||||
static int32_t qndOpenWrapper(SQnodeOpt *pOption, SQnode **pQnode) {
|
||||
*pQnode = qndOpen(pOption);
|
||||
if (*pQnode == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||
int32_t code = 0;
|
||||
SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt));
|
||||
if (pMgmt == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pMgmt->pData = pInput->pData;
|
||||
|
@ -50,29 +57,30 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
|
||||
SQnodeOpt option = {0};
|
||||
qmInitOption(pMgmt, &option);
|
||||
pMgmt->pQnode = qndOpen(&option);
|
||||
if (pMgmt->pQnode == NULL) {
|
||||
dError("failed to open qnode since %s", terrstr());
|
||||
|
||||
code = qndOpenWrapper(&option, &pMgmt->pQnode);
|
||||
if (code != 0) {
|
||||
dError("failed to open qnode since %s", tstrerror(code));
|
||||
qmClose(pMgmt);
|
||||
return -1;
|
||||
}
|
||||
tmsgReportStartup("qnode-impl", "initialized");
|
||||
|
||||
if (udfcOpen() != 0) {
|
||||
if ((code = udfcOpen()) != 0) {
|
||||
dError("qnode can not open udfc");
|
||||
qmClose(pMgmt);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (qmStartWorker(pMgmt) != 0) {
|
||||
dError("failed to start qnode worker since %s", terrstr());
|
||||
if ((code = qmStartWorker(pMgmt)) != 0) {
|
||||
dError("failed to start qnode worker since %s", tstrerror(code));
|
||||
qmClose(pMgmt);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
tmsgReportStartup("qnode-worker", "initialized");
|
||||
|
||||
pOutput->pMgmt = pMgmt;
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
SMgmtFunc qmGetMgmtFunc() {
|
||||
|
|
|
@ -99,6 +99,8 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
}
|
||||
|
||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
|
||||
SSingleWorkerCfg queryCfg = {
|
||||
.min = tsNumOfVnodeQueryThreads,
|
||||
.max = tsNumOfVnodeQueryThreads,
|
||||
|
@ -108,9 +110,9 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
|||
.poolType = QUERY_AUTO_QWORKER_POOL,
|
||||
};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
|
||||
dError("failed to start qnode-query worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg)) != 0) {
|
||||
dError("failed to start qnode-query worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg fetchCfg = {
|
||||
|
@ -121,13 +123,13 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
|||
.param = pMgmt,
|
||||
};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) {
|
||||
dError("failed to start qnode-fetch worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg)) != 0) {
|
||||
dError("failed to start qnode-fetch worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
dDebug("qnode workers are initialized");
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
void qmStopWorker(SQnodeMgmt *pMgmt) {
|
||||
|
|
|
@ -19,24 +19,25 @@
|
|||
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
|
||||
|
||||
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SDCreateSnodeReq createReq = {0};
|
||||
if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to create snode since %s", terrstr());
|
||||
code = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to create snode since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&createReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
bool deployed = true;
|
||||
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
|
||||
dError("failed to write snode file since %s", terrstr());
|
||||
if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) {
|
||||
dError("failed to write snode file since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&createReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
tFreeSMCreateQnodeReq(&createReq);
|
||||
|
@ -44,24 +45,26 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SDDropSnodeReq dropReq = {0};
|
||||
if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to drop snode since %s", terrstr());
|
||||
code = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to drop snode since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
bool deployed = false;
|
||||
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
|
||||
dError("failed to write snode file since %s", terrstr());
|
||||
if ((code = dmWriteFile(pInput->path, pInput->name, deployed)) != 0) {
|
||||
dError("failed to write snode file since %s", tstrerror(code));
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
tFreeSMCreateQnodeReq(&dropReq);
|
||||
|
|
|
@ -33,12 +33,19 @@ static void smClose(SSnodeMgmt *pMgmt) {
|
|||
|
||||
taosMemoryFree(pMgmt);
|
||||
}
|
||||
|
||||
int32_t sndOpenWrapper(const char *path, SSnodeOpt *pOption, SSnode **pNode) {
|
||||
*pNode = sndOpen(path, pOption);
|
||||
if (*pNode == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||
int32_t code = 0;
|
||||
SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt));
|
||||
if (pMgmt == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
pMgmt->pData = pInput->pData;
|
||||
|
@ -50,35 +57,34 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
|
||||
SSnodeOpt option = {0};
|
||||
smInitOption(pMgmt, &option);
|
||||
pMgmt->pSnode = sndOpen(pMgmt->path, &option);
|
||||
if (pMgmt->pSnode == NULL) {
|
||||
dError("failed to open snode since %s", terrstr());
|
||||
|
||||
code = sndOpenWrapper(pMgmt->path, &option, &pMgmt->pSnode);
|
||||
if (code != 0) {
|
||||
dError("failed to open snode since %s", tstrerror(code));
|
||||
smClose(pMgmt);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
tmsgReportStartup("snode-impl", "initialized");
|
||||
|
||||
if (smStartWorker(pMgmt) != 0) {
|
||||
dError("failed to start snode worker since %s", terrstr());
|
||||
if ((code = smStartWorker(pMgmt)) != 0) {
|
||||
dError("failed to start snode worker since %s", tstrerror(code));
|
||||
smClose(pMgmt);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
tmsgReportStartup("snode-worker", "initialized");
|
||||
|
||||
if (udfcOpen() != 0) {
|
||||
dError("failed to open udfc in snode");
|
||||
if ((code = udfcOpen()) != 0) {
|
||||
dError("failed to open udfc in snode since:%s", tstrerror(code));
|
||||
smClose(pMgmt);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
pOutput->pMgmt = pMgmt;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t smStartSnodes(SSnodeMgmt *pMgmt) {
|
||||
return sndInit(pMgmt->pSnode);
|
||||
}
|
||||
static int32_t smStartSnodes(SSnodeMgmt *pMgmt) { return sndInit(pMgmt->pSnode); }
|
||||
|
||||
SMgmtFunc smGetMgmtFunc() {
|
||||
SMgmtFunc mgmtFunc = {0};
|
||||
|
|
|
@ -68,17 +68,18 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
pMgmt->writeWroker = taosArrayInit(0, sizeof(SMultiWorker *));
|
||||
if (pMgmt->writeWroker == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsNumOfSnodeWriteThreads; i++) {
|
||||
SMultiWorker *pWriteWorker = taosMemoryMalloc(sizeof(SMultiWorker));
|
||||
if (pWriteWorker == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
SMultiWorkerCfg cfg = {
|
||||
|
@ -87,13 +88,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
|||
.fp = smProcessWriteQueue,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tMultiWorkerInit(pWriteWorker, &cfg) != 0) {
|
||||
dError("failed to start snode-unique worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tMultiWorkerInit(pWriteWorker, &cfg)) != 0) {
|
||||
dError("failed to start snode-unique worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
if (taosArrayPush(pMgmt->writeWroker, &pWriteWorker) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,13 +106,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
|||
.param = pMgmt,
|
||||
};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) {
|
||||
dError("failed to start snode shared-worker since %s", terrstr());
|
||||
return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) != 0) {
|
||||
dError("failed to start snode shared-worker since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
dDebug("snode workers are initialized");
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
void smStopWorker(SSnodeMgmt *pMgmt) {
|
||||
|
@ -126,21 +127,23 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||
int32_t code = 0;
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
||||
if (pMsg == NULL) {
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
pRpc->pCont = NULL;
|
||||
return -1;
|
||||
return code = terrno;
|
||||
}
|
||||
|
||||
SSnode *pSnode = pMgmt->pSnode;
|
||||
if (pSnode == NULL) {
|
||||
dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, terrstr(),
|
||||
code = terrno;
|
||||
dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, tstrerror(code),
|
||||
TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
|
||||
taosFreeQitem(pMsg);
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
pRpc->pCont = NULL;
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
SMsgHead *pHead = pRpc->pCont;
|
||||
|
@ -151,48 +154,44 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
|
||||
switch (qtype) {
|
||||
case STREAM_QUEUE:
|
||||
smPutNodeMsgToStreamQueue(pMgmt, pMsg);
|
||||
code = smPutNodeMsgToStreamQueue(pMgmt, pMsg);
|
||||
break;
|
||||
case WRITE_QUEUE:
|
||||
smPutNodeMsgToWriteQueue(pMgmt, pMsg);
|
||||
code = smPutNodeMsgToWriteQueue(pMgmt, pMsg);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
|
||||
if (pWorker == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
taosWriteQitem(pWorker->queue, pMsg);
|
||||
return 0;
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
|
||||
if (pWorker == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
taosWriteQitem(pWorker->queue, pMsg);
|
||||
return 0;
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SSingleWorker *pWorker = &pMgmt->streamWorker;
|
||||
|
||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
taosWriteQitem(pWorker->queue, pMsg);
|
||||
return 0;
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
|
|
@ -54,24 +54,24 @@ typedef struct {
|
|||
} SWrapperCfg;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t vgVersion;
|
||||
int32_t refCount;
|
||||
int8_t dropped;
|
||||
int8_t failed;
|
||||
int8_t disable;
|
||||
int32_t diskPrimary;
|
||||
int32_t toVgId;
|
||||
char *path;
|
||||
SVnode *pImpl;
|
||||
SMultiWorker pWriteW;
|
||||
SMultiWorker pSyncW;
|
||||
SMultiWorker pSyncRdW;
|
||||
SMultiWorker pApplyW;
|
||||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pStreamQ;
|
||||
STaosQueue *pFetchQ;
|
||||
STaosQueue *pMultiMgmQ;
|
||||
int32_t vgId;
|
||||
int32_t vgVersion;
|
||||
int32_t refCount;
|
||||
int8_t dropped;
|
||||
int8_t failed;
|
||||
int8_t disable;
|
||||
int32_t diskPrimary;
|
||||
int32_t toVgId;
|
||||
char *path;
|
||||
SVnode *pImpl;
|
||||
SMultiWorker pWriteW;
|
||||
SMultiWorker pSyncW;
|
||||
SMultiWorker pSyncRdW;
|
||||
SMultiWorker pApplyW;
|
||||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pStreamQ;
|
||||
STaosQueue *pFetchQ;
|
||||
STaosQueue *pMultiMgmQ;
|
||||
} SVnodeObj;
|
||||
|
||||
typedef struct {
|
||||
|
@ -107,9 +107,9 @@ int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
||||
// vmFile.c
|
||||
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
||||
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt);
|
||||
SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes);
|
||||
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
||||
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt);
|
||||
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes);
|
||||
|
||||
// vmWorker.c
|
||||
int32_t vmStartWorker(SVnodeMgmt *pMgmt);
|
||||
|
|
|
@ -19,12 +19,16 @@
|
|||
|
||||
#define MAX_CONTENT_LEN 2 * 1024 * 1024
|
||||
|
||||
SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
|
||||
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
||||
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t size = taosHashGetSize(pMgmt->hash);
|
||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||
if (pVnodes == NULL) {
|
||||
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
while (pIter) {
|
||||
|
@ -42,8 +46,9 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
|
|||
|
||||
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
*numOfVnodes = num;
|
||||
*ppVnodes = pVnodes;
|
||||
|
||||
return pVnodes;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
|
||||
|
@ -52,29 +57,32 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
|
|||
*ppCfgs = NULL;
|
||||
|
||||
SJson *vnodes = tjsonGetObjectItem(pJson, "vnodes");
|
||||
if (vnodes == NULL) return -1;
|
||||
if (vnodes == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
|
||||
int32_t vnodesNum = cJSON_GetArraySize(vnodes);
|
||||
if (vnodesNum > 0) {
|
||||
pCfgs = taosMemoryCalloc(vnodesNum, sizeof(SWrapperCfg));
|
||||
if (pCfgs == NULL) return -1;
|
||||
if (pCfgs == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < vnodesNum; ++i) {
|
||||
SJson *vnode = tjsonGetArrayItem(vnodes, i);
|
||||
if (vnode == NULL) goto _OVER;
|
||||
if (vnode == NULL) {
|
||||
code = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SWrapperCfg *pCfg = &pCfgs[i];
|
||||
tjsonGetInt32ValueFromDouble(vnode, "vgId", pCfg->vgId, code);
|
||||
if (code < 0) goto _OVER;
|
||||
if (code != 0) goto _OVER;
|
||||
tjsonGetInt32ValueFromDouble(vnode, "dropped", pCfg->dropped, code);
|
||||
if (code < 0) goto _OVER;
|
||||
if (code != 0) goto _OVER;
|
||||
tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code);
|
||||
if (code < 0) goto _OVER;
|
||||
if (code != 0) goto _OVER;
|
||||
tjsonGetInt32ValueFromDouble(vnode, "diskPrimary", pCfg->diskPrimary, code);
|
||||
if (code < 0) goto _OVER;
|
||||
if (code != 0) goto _OVER;
|
||||
tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code);
|
||||
if (code < 0) goto _OVER;
|
||||
if (code != 0) goto _OVER;
|
||||
|
||||
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId);
|
||||
}
|
||||
|
@ -98,33 +106,35 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
|
|||
snprintf(file, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
if (taosStatFile(file, NULL, NULL, NULL) < 0) {
|
||||
dInfo("vnode file:%s not exist", file);
|
||||
return 0;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dInfo("vnode file:%s not exist, reason:%s", file, tstrerror(code));
|
||||
code = 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to open vnode file:%s since %s", file, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to open vnode file:%s since %s", file, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
int64_t size = 0;
|
||||
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to fstat mnode file:%s since %s", file, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to fstat mnode file:%s since %s", file, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pData = taosMemoryMalloc(size + 1);
|
||||
if (pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (taosReadFile(pFile, pData, size) != size) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to read vnode file:%s since %s", file, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to read vnode file:%s since %s", file, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -132,12 +142,12 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
|
|||
|
||||
pJson = tjsonParse(pData);
|
||||
if (pJson == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
code = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (vmDecodeVnodeList(pJson, pMgmt, ppCfgs, numOfVnodes) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
code = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -150,28 +160,36 @@ _OVER:
|
|||
if (pFile != NULL) taosCloseFile(&pFile);
|
||||
|
||||
if (code != 0) {
|
||||
dError("failed to read vnode file:%s since %s", file, terrstr());
|
||||
dError("failed to read vnode file:%s since %s", file, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t numOfVnodes) {
|
||||
SJson *vnodes = tjsonCreateArray();
|
||||
if (vnodes == NULL) return -1;
|
||||
if (tjsonAddItemToObject(pJson, "vnodes", vnodes) < 0) return -1;
|
||||
int32_t code = 0;
|
||||
SJson *vnodes = tjsonCreateArray();
|
||||
if (vnodes == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if ((code = tjsonAddItemToObject(pJson, "vnodes", vnodes)) < 0) {
|
||||
tjsonDelete(vnodes);
|
||||
return code;
|
||||
};
|
||||
|
||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||
SVnodeObj *pVnode = ppVnodes[i];
|
||||
if (pVnode == NULL) continue;
|
||||
|
||||
SJson *vnode = tjsonCreateObject();
|
||||
if (vnode == NULL) return -1;
|
||||
if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary) < 0) return -1;
|
||||
if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1;
|
||||
if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1;
|
||||
if (vnode == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
if ((code = tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId)) < 0) return code;
|
||||
if ((code = tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped)) < 0) return code;
|
||||
if ((code = tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion)) < 0) return code;
|
||||
if ((code = tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary)) < 0) return code;
|
||||
if (pVnode->toVgId) {
|
||||
if ((code = tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId)) < 0) return code;
|
||||
}
|
||||
if ((code = tjsonAddItemToArray(vnodes, vnode)) < 0) return code;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -185,30 +203,60 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
|
|||
SVnodeObj **ppVnodes = NULL;
|
||||
char file[PATH_MAX] = {0};
|
||||
char realfile[PATH_MAX] = {0};
|
||||
snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP);
|
||||
snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
int32_t nBytes = snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP);
|
||||
if (nBytes <= 0 || nBytes >= sizeof(file)) {
|
||||
return TSDB_CODE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
nBytes = snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
|
||||
if (nBytes <= 0 || nBytes >= sizeof(realfile)) {
|
||||
return TSDB_CODE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
int32_t numOfVnodes = 0;
|
||||
ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||
if (ppVnodes == NULL) goto _OVER;
|
||||
code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
|
||||
if (code) goto _OVER;
|
||||
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pJson = tjsonCreateObject();
|
||||
if (pJson == NULL) goto _OVER;
|
||||
if (vmEncodeVnodeList(pJson, ppVnodes, numOfVnodes) != 0) goto _OVER;
|
||||
if (pJson == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
if ((code = vmEncodeVnodeList(pJson, ppVnodes, numOfVnodes)) != 0) goto _OVER;
|
||||
|
||||
buffer = tjsonToString(pJson);
|
||||
if (buffer == NULL) goto _OVER;
|
||||
terrno = 0;
|
||||
if (buffer == NULL) {
|
||||
code = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||
if (pFile == NULL) goto _OVER;
|
||||
if (pFile == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
int32_t len = strlen(buffer);
|
||||
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
|
||||
if (taosFsyncFile(pFile) < 0) goto _OVER;
|
||||
if (taosWriteFile(pFile, buffer, len) <= 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
if (taosFsyncFile(pFile) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
taosCloseFile(&pFile);
|
||||
if (taosRenameFile(file, realfile) != 0) goto _OVER;
|
||||
code = taosCloseFile(&pFile);
|
||||
if (code != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
if (taosRenameFile(file, realfile) != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
dInfo("succeed to write vnodes file:%s, vnodes:%d", realfile, numOfVnodes);
|
||||
|
@ -228,8 +276,7 @@ _OVER:
|
|||
}
|
||||
|
||||
if (code != 0) {
|
||||
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, terrstr(), numOfVnodes);
|
||||
dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, tstrerror(code), numOfVnodes);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -256,8 +256,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
if (req.learnerReplica == 0) {
|
||||
|
@ -298,25 +297,24 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
|
||||
strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", req.vgId, pReplica->id, pReplica->fqdn,
|
||||
pReplica->port);
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode, reason:%s", req.vgId, pReplica->id,
|
||||
pReplica->fqdn, pReplica->port, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (req.encryptAlgorithm == DND_CA_SM4) {
|
||||
if (strlen(tsEncryptKey) == 0) {
|
||||
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||
dError("vgId:%d, failed to create vnode since encrypt key is empty", req.vgId);
|
||||
return -1;
|
||||
code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||
dError("vgId:%d, failed to create vnode since encrypt key is empty, reason:%s", req.vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
vmGenerateVnodeCfg(&req, &vnodeCfg);
|
||||
|
||||
if (vmTsmaAdjustDays(&vnodeCfg, &req) < 0) {
|
||||
dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, terrstr());
|
||||
code = terrno != 0 ? terrno : -1;
|
||||
if ((code = vmTsmaAdjustDays(&vnodeCfg, &req)) < 0) {
|
||||
dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -327,8 +325,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
dError("vgId:%d, already exist", req.vgId);
|
||||
tFreeSCreateVnodeReq(&req);
|
||||
vmReleaseVnode(pMgmt, pVnode);
|
||||
terrno = TSDB_CODE_VND_ALREADY_EXIST;
|
||||
code = terrno;
|
||||
code = TSDB_CODE_VND_ALREADY_EXIST;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
|||
}
|
||||
|
||||
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||
int32_t code = 0;
|
||||
STfs *pTfs = pMgmt->pTfs;
|
||||
int32_t diskId = 0;
|
||||
if (!pTfs) {
|
||||
|
@ -59,7 +60,12 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
|||
// alloc
|
||||
int32_t disks[TFS_MAX_DISKS_PER_TIER] = {0};
|
||||
int32_t numOfVnodes = 0;
|
||||
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||
SVnodeObj **ppVnodes = NULL;
|
||||
|
||||
code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
for (int32_t v = 0; v < numOfVnodes; v++) {
|
||||
SVnodeObj *pVnode = ppVnodes[v];
|
||||
disks[pVnode->diskPrimary] += 1;
|
||||
|
@ -436,6 +442,7 @@ static void *vmCloseVnodeInThread(void *param) {
|
|||
}
|
||||
|
||||
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
dInfo("start to close all vnodes");
|
||||
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
||||
dInfo("vnodes mgmt worker is stopped");
|
||||
|
@ -443,7 +450,12 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
|||
dInfo("vnodes multiple mgmt worker is stopped");
|
||||
|
||||
int32_t numOfVnodes = 0;
|
||||
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||
SVnodeObj **ppVnodes = NULL;
|
||||
code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
|
||||
if (code != 0) {
|
||||
dError("failed to get vnode list since %s", tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t threadNum = tsNumOfCores / 2;
|
||||
if (threadNum < 1) threadNum = 1;
|
||||
|
@ -513,8 +525,14 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfVnodes = 0;
|
||||
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||
SVnodeObj **ppVnodes = NULL;
|
||||
code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
|
||||
if (code != 0) {
|
||||
dError("failed to get vnode list since %s", tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
if (ppVnodes != NULL) {
|
||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||
|
@ -549,12 +567,14 @@ static void *vmThreadFp(void *param) {
|
|||
}
|
||||
|
||||
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
|
||||
dError("failed to create vnode timer thread since %s", strerror(errno));
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create vnode timer thread since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
|
@ -573,7 +593,10 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
int32_t code = -1;
|
||||
|
||||
SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
|
||||
if (pMgmt == NULL) goto _OVER;
|
||||
if (pMgmt == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pMgmt->pData = pInput->pData;
|
||||
pMgmt->path = pInput->path;
|
||||
|
@ -582,8 +605,18 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
|
||||
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
|
||||
pMgmt->msgCb.mgmt = pMgmt;
|
||||
taosThreadRwlockInit(&pMgmt->lock, NULL);
|
||||
taosThreadMutexInit(&pMgmt->createLock, NULL);
|
||||
|
||||
code = taosThreadRwlockInit(&pMgmt->lock, NULL);
|
||||
if (code != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = taosThreadMutexInit(&pMgmt->createLock, NULL);
|
||||
if (code != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pMgmt->pTfs = pInput->pTfs;
|
||||
if (pMgmt->pTfs == NULL) {
|
||||
|
@ -592,38 +625,39 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
}
|
||||
tmsgReportStartup("vnode-tfs", "initialized");
|
||||
|
||||
if (walInit() != 0) {
|
||||
dError("failed to init wal since %s", terrstr());
|
||||
if ((code = walInit()) != 0) {
|
||||
dError("failed to init wal since %s", tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
tmsgReportStartup("vnode-wal", "initialized");
|
||||
|
||||
if (syncInit() != 0) {
|
||||
dError("failed to open sync since %s", terrstr());
|
||||
if ((code = syncInit()) != 0) {
|
||||
dError("failed to open sync since %s", tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
tmsgReportStartup("vnode-sync", "initialized");
|
||||
|
||||
if (vnodeInit(tsNumOfCommitThreads) != 0) {
|
||||
dError("failed to init vnode since %s", terrstr());
|
||||
if ((code = vnodeInit(tsNumOfCommitThreads)) != 0) {
|
||||
dError("failed to init vnode since %s", tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
tmsgReportStartup("vnode-commit", "initialized");
|
||||
|
||||
if (vmStartWorker(pMgmt) != 0) {
|
||||
dError("failed to init workers since %s", terrstr());
|
||||
if ((code = vmStartWorker(pMgmt)) != 0) {
|
||||
dError("failed to init workers since %s", tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
tmsgReportStartup("vnode-worker", "initialized");
|
||||
|
||||
if (vmOpenVnodes(pMgmt) != 0) {
|
||||
dError("failed to open all vnodes since %s", terrstr());
|
||||
if ((code = vmOpenVnodes(pMgmt)) != 0) {
|
||||
dError("failed to open all vnodes since %s", tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
tmsgReportStartup("vnode-vnodes", "initialized");
|
||||
|
||||
if (udfcOpen() != 0) {
|
||||
dError("failed to open udfc in vnode");
|
||||
if ((code = udfcOpen()) != 0) {
|
||||
dError("failed to open udfc in vnode since %s", tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -633,7 +667,7 @@ _OVER:
|
|||
if (code == 0) {
|
||||
pOutput->pMgmt = pMgmt;
|
||||
} else {
|
||||
dError("failed to init vnodes-mgmt since %s", terrstr());
|
||||
dError("failed to init vnodes-mgmt since %s", tstrerror(code));
|
||||
vmCleanup(pMgmt);
|
||||
}
|
||||
|
||||
|
@ -683,18 +717,32 @@ static void *vmRestoreVnodeInThread(void *param) {
|
|||
}
|
||||
|
||||
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfVnodes = 0;
|
||||
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||
SVnodeObj **ppVnodes = NULL;
|
||||
code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes);
|
||||
if (code != 0) {
|
||||
dError("failed to get vnode list since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t threadNum = tsNumOfCores / 2;
|
||||
if (threadNum < 1) threadNum = 1;
|
||||
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||
|
||||
SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
|
||||
if (threads == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
threads[t].threadIndex = t;
|
||||
threads[t].pMgmt = pMgmt;
|
||||
threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
|
||||
if (threads[t].ppVnodes == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t v = 0; v < numOfVnodes; ++v) {
|
||||
|
@ -717,6 +765,7 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
|
|||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
|
||||
dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
|
||||
ASSERT(errno == 0);
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
|
@ -742,6 +791,14 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
return vmInitTimer(pMgmt);
|
||||
|
||||
_exit:
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
SVnodeThread *pThread = &threads[t];
|
||||
taosMemoryFree(pThread->ppVnodes);
|
||||
}
|
||||
taosMemoryFree(threads);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
|
||||
|
|
|
@ -200,26 +200,33 @@ static bool vmDataSpaceSufficient(SVnodeObj *pVnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t vmAcquireVnodeWrapper(SVnodeMgmt *pMgt, int32_t vgId, SVnodeObj **pNode) {
|
||||
*pNode = vmAcquireVnode(pMgt, vgId);
|
||||
if (*pNode == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
|
||||
int32_t code = 0;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
if (pMsg->contLen < sizeof(SMsgHead)) {
|
||||
dGError("invalid rpc msg with no msg head at pCont. pMsg:%p, type:%s, contLen:%d", pMsg, TMSG_INFO(pMsg->msgType),
|
||||
pMsg->contLen);
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
int32_t code = 0;
|
||||
|
||||
pHead->contLen = ntohl(pHead->contLen);
|
||||
pHead->vgId = ntohl(pHead->vgId);
|
||||
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||
if (pVnode == NULL) {
|
||||
SVnodeObj *pVnode = NULL;
|
||||
code = vmAcquireVnodeWrapper(pMgmt, pHead->vgId, &pVnode);
|
||||
if (code != 0) {
|
||||
dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
|
||||
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
||||
terrno = (terrno != 0) ? terrno : -1;
|
||||
return terrno;
|
||||
tstrerror(code), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
||||
return code;
|
||||
}
|
||||
|
||||
switch (qtype) {
|
||||
|
@ -234,49 +241,45 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
break;
|
||||
case STREAM_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pStreamQ, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamQ, pMsg);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||
break;
|
||||
case WRITE_QUEUE:
|
||||
if (!vmDataSpaceSufficient(pVnode)) {
|
||||
terrno = TSDB_CODE_NO_ENOUGH_DISKSPACE;
|
||||
code = terrno;
|
||||
code = TSDB_CODE_NO_ENOUGH_DISKSPACE;
|
||||
dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
if (pMsg->msgType == TDMT_VND_SUBMIT && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
|
||||
terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
|
||||
code = terrno;
|
||||
code = TSDB_CODE_VND_NO_WRITE_AUTH;
|
||||
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
if (pMsg->msgType != TDMT_VND_ALTER_CONFIRM && pVnode->disable) {
|
||||
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since its disable", pVnode->vgId, pMsg);
|
||||
terrno = TSDB_CODE_VND_STOPPED;
|
||||
code = terrno;
|
||||
code = TSDB_CODE_VND_STOPPED;
|
||||
break;
|
||||
}
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pWriteW.queue, pMsg);
|
||||
code = taosWriteQitem(pVnode->pWriteW.queue, pMsg);
|
||||
break;
|
||||
case SYNC_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncW.queue, pMsg);
|
||||
code = taosWriteQitem(pVnode->pSyncW.queue, pMsg);
|
||||
break;
|
||||
case SYNC_RD_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-sync-rd queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncRdW.queue, pMsg);
|
||||
code = taosWriteQitem(pVnode->pSyncRdW.queue, pMsg);
|
||||
break;
|
||||
case APPLY_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pApplyW.queue, pMsg);
|
||||
code = taosWriteQitem(pVnode->pApplyW.queue, pMsg);
|
||||
break;
|
||||
default:
|
||||
code = -1;
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -299,32 +302,32 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMs
|
|||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
|
||||
taosWriteQitem(pMgmt->mgmtMultiWorker.queue, pMsg);
|
||||
return 0;
|
||||
return taosWriteQitem(pMgmt->mgmtMultiWorker.queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, put into vnode-mgmt queue", pMsg);
|
||||
taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
|
||||
return 0;
|
||||
return taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||
int32_t code = 0;
|
||||
if (pRpc->contLen < sizeof(SMsgHead)) {
|
||||
dError("invalid rpc msg with no msg head at pCont. pRpc:%p, type:%s, len:%d", pRpc, TMSG_INFO(pRpc->msgType),
|
||||
pRpc->contLen);
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
pRpc->pCont = NULL;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM;
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
|
||||
if (pMsg == NULL) {
|
||||
SRpcMsg *pMsg = NULL;
|
||||
code = taosAllocateQitemWrapper(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
pRpc->pCont = NULL;
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
SMsgHead *pHead = pRpc->pCont;
|
||||
|
@ -335,7 +338,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
pRpc->pCont = NULL;
|
||||
|
||||
int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype);
|
||||
code = vmPutMsgToQueue(pMgmt, pMsg, qtype);
|
||||
if (code != 0) {
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
@ -381,6 +384,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
}
|
||||
|
||||
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
int32_t code = 0;
|
||||
SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl};
|
||||
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||
|
@ -396,8 +400,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
|
||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
|
||||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
|
||||
|
@ -426,26 +429,27 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
}
|
||||
|
||||
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool;
|
||||
pQPool->name = "vnode-query";
|
||||
pQPool->min = tsNumOfVnodeQueryThreads;
|
||||
pQPool->max = tsNumOfVnodeQueryThreads;
|
||||
if (tQueryAutoQWorkerInit(pQPool) != 0) return -1;
|
||||
if ((code = tQueryAutoQWorkerInit(pQPool)) != 0) return code;
|
||||
|
||||
SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool;
|
||||
pStreamPool->name = "vnode-stream";
|
||||
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
|
||||
if (tAutoQWorkerInit(pStreamPool) != 0) return -1;
|
||||
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
|
||||
|
||||
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||
pFPool->name = "vnode-fetch";
|
||||
pFPool->max = tsNumOfVnodeFetchThreads;
|
||||
if (tWWorkerInit(pFPool) != 0) return -1;
|
||||
if ((code = tWWorkerInit(pFPool)) != 0) return code;
|
||||
|
||||
SSingleWorkerCfg mgmtCfg = {
|
||||
.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg)) != 0) return code;
|
||||
|
||||
int32_t threadNum = 0;
|
||||
if (tsNumOfCores == 1) {
|
||||
|
@ -459,7 +463,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
.fp = (FItem)vmProcessMultiMgmtQueue,
|
||||
.param = pMgmt};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->mgmtMultiWorker, &multiMgmtCfg) != 0) return -1;
|
||||
if ((code = tSingleWorkerInit(&pMgmt->mgmtMultiWorker, &multiMgmtCfg)) != 0) return code;
|
||||
|
||||
dDebug("vnode workers are initialized");
|
||||
return 0;
|
||||
|
|
|
@ -14,11 +14,11 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
#include "audit.h"
|
||||
#include "dmMgmt.h"
|
||||
#include "libs/function/tudf.h"
|
||||
#include "tgrant.h"
|
||||
#include "tcompare.h"
|
||||
#include "tgrant.h"
|
||||
|
||||
#define DM_INIT_AUDIT() \
|
||||
do { \
|
||||
|
@ -256,8 +256,8 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
default:
|
||||
code = TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
dError("failed to create node since %s", terrstr());
|
||||
return terrno = code;
|
||||
dError("failed to create node since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
dInfo("start to process create-node-request");
|
||||
|
@ -267,7 +267,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
dmReleaseWrapper(pWrapper);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
|
||||
return terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pDnode->mutex);
|
||||
|
@ -298,7 +298,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
|
||||
if (pWrapper == NULL) {
|
||||
dError("fail to process alter node type since node not exist");
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
|
||||
|
@ -312,7 +312,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
if (role == TAOS_SYNC_ROLE_VOTER) {
|
||||
dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
|
||||
code = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
|
||||
return terrno = code;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -320,7 +320,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
dInfo("node:%s, checking node catch up", pWrapper->name);
|
||||
if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) {
|
||||
code = TSDB_CODE_MNODE_NOT_CATCH_UP;
|
||||
return terrno = code;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -338,7 +338,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
taosThreadMutexUnlock(&pDnode->mutex);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
|
||||
return terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||
|
@ -381,7 +381,7 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
code = TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
dError("failed to drop node since %s", terrstr());
|
||||
dError("failed to drop node since %s", tstrerror(code));
|
||||
return terrno = code;
|
||||
}
|
||||
|
||||
|
@ -439,6 +439,4 @@ void dmReportStartup(const char *pName, const char *pDesc) {
|
|||
|
||||
int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
|
||||
|
||||
bool dmReadyForTest() {
|
||||
return dmInstance()->data.dnodeVer > 0;
|
||||
}
|
||||
bool dmReadyForTest() { return dmInstance()->data.dnodeVer > 0; }
|
||||
|
|
|
@ -305,7 +305,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
|
|||
}
|
||||
taosThreadRwlockUnlock(&pWrapper->lock);
|
||||
|
||||
return terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
|
||||
|
|
|
@ -17,12 +17,13 @@
|
|||
#include "dmMgmt.h"
|
||||
|
||||
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||
int32_t code = 0;
|
||||
SDnode *pDnode = pWrapper->pDnode;
|
||||
|
||||
if (taosMkDir(pWrapper->path) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, tstrerror(code));
|
||||
return terrno = code;
|
||||
}
|
||||
|
||||
SMgmtOutputOpt output = {0};
|
||||
|
@ -30,7 +31,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
|||
|
||||
dInfo("node:%s, start to open", pWrapper->name);
|
||||
tmsgSetDefault(&input.msgCb);
|
||||
if ((*pWrapper->func.openFp)(&input, &output) != 0) {
|
||||
if ((code = (*pWrapper->func.openFp)(&input, &output)) != 0) {
|
||||
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -39,9 +39,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
|||
|
||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
|
||||
if (msgFp == NULL) {
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
// terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
return -1;
|
||||
return TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
}
|
||||
|
||||
dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
|
||||
|
@ -54,14 +54,15 @@ static bool dmFailFastFp(tmsg_t msgType) {
|
|||
return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES;
|
||||
}
|
||||
|
||||
static void dmConvertErrCode(tmsg_t msgType) {
|
||||
if (terrno != TSDB_CODE_APP_IS_STOPPING) {
|
||||
return;
|
||||
static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
|
||||
if (code != TSDB_CODE_APP_IS_STOPPING) {
|
||||
return code;
|
||||
}
|
||||
if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
|
||||
(msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
|
||||
terrno = TSDB_CODE_VND_STOPPED;
|
||||
code = TSDB_CODE_VND_STOPPED;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
|
||||
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
||||
|
@ -99,14 +100,14 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
|
||||
int32_t svrVer = 0;
|
||||
taosVersionStrToInt(version, &svrVer);
|
||||
if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) {
|
||||
if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
|
||||
dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
|
||||
if (isForbidden) {
|
||||
terrno = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
|
||||
code = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -119,7 +120,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
case TDMT_SCH_FETCH_RSP:
|
||||
case TDMT_SCH_MERGE_FETCH_RSP:
|
||||
case TDMT_VND_SUBMIT_RSP:
|
||||
qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
|
||||
code = qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
|
||||
return;
|
||||
case TDMT_MND_STATUS_RSP:
|
||||
if (pEpSet != NULL) {
|
||||
|
@ -148,32 +149,32 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
return;
|
||||
} else {
|
||||
if (pDnode->status == DND_STAT_INIT) {
|
||||
terrno = TSDB_CODE_APP_IS_STARTING;
|
||||
code = TSDB_CODE_APP_IS_STARTING;
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_IS_STOPPING;
|
||||
code = TSDB_CODE_APP_IS_STOPPING;
|
||||
}
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_IS_STARTING;
|
||||
code = TSDB_CODE_APP_IS_STARTING;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
|
||||
dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
|
||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||
code = TSDB_CODE_INVALID_MSG_LEN;
|
||||
goto _OVER;
|
||||
} else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
|
||||
(!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
|
||||
dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
|
||||
terrno = pRpc->code;
|
||||
code = pRpc->code;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (pHandle->defaultNtype == NODE_END) {
|
||||
dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -197,12 +198,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
} else {
|
||||
dGError("msg:%p, type:%s contLen is 0", pRpc, TMSG_INFO(pRpc->msgType));
|
||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||
code = TSDB_CODE_INVALID_MSG_LEN;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (dmMarkWrapper(pWrapper) != 0) {
|
||||
if ((code = dmMarkWrapper(pWrapper)) != 0) {
|
||||
pWrapper = NULL;
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -210,8 +211,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
pRpc->info.wrapper = pWrapper;
|
||||
|
||||
EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed
|
||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
|
||||
if (pMsg == NULL) goto _OVER;
|
||||
code = taosAllocateQitemWrapper(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
|
||||
if (code != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle,
|
||||
|
@ -221,10 +224,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
|
||||
_OVER:
|
||||
if (code != 0) {
|
||||
dmConvertErrCode(pRpc->msgType);
|
||||
if (terrno != 0) code = terrno;
|
||||
code = dmConvertErrCode(pRpc->msgType, code);
|
||||
if (pMsg) {
|
||||
dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
|
||||
dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
|
||||
} else {
|
||||
dGTrace("msg:%p, failed to process empty msg since %s", pMsg, terrstr());
|
||||
}
|
||||
|
@ -280,17 +282,19 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
|||
}
|
||||
|
||||
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SDnode *pDnode = dmInstance();
|
||||
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
if (pDnode->status == DND_STAT_INIT) {
|
||||
terrno = TSDB_CODE_APP_IS_STARTING;
|
||||
code = TSDB_CODE_APP_IS_STARTING;
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_IS_STOPPING;
|
||||
code = TSDB_CODE_APP_IS_STOPPING;
|
||||
}
|
||||
dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle);
|
||||
return -1;
|
||||
dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), tstrerror(code),
|
||||
pMsg->info.handle);
|
||||
return code;
|
||||
} else {
|
||||
pMsg->info.handle = 0;
|
||||
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
|
||||
|
@ -521,7 +525,7 @@ int32_t dmInitServer(SDnode *pDnode) {
|
|||
pTrans->serverRpc = rpcOpen(&rpcInit);
|
||||
if (pTrans->serverRpc == NULL) {
|
||||
dError("failed to init dnode rpc server");
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
dDebug("dnode rpc server is initialized");
|
||||
|
|
|
@ -464,26 +464,23 @@ static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
|
|||
static int32_t mndInitWal(SMnode *pMnode) {
|
||||
char path[PATH_MAX + 20] = {0};
|
||||
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
||||
SWalCfg cfg = {
|
||||
.vgId = 1,
|
||||
.fsyncPeriod = 0,
|
||||
.rollPeriod = -1,
|
||||
.segSize = -1,
|
||||
.retentionPeriod = 0,
|
||||
.retentionSize = 0,
|
||||
.level = TAOS_WAL_FSYNC,
|
||||
.encryptAlgorithm = 0,
|
||||
.encryptKey = {0}
|
||||
};
|
||||
SWalCfg cfg = {.vgId = 1,
|
||||
.fsyncPeriod = 0,
|
||||
.rollPeriod = -1,
|
||||
.segSize = -1,
|
||||
.retentionPeriod = 0,
|
||||
.retentionSize = 0,
|
||||
.level = TAOS_WAL_FSYNC,
|
||||
.encryptAlgorithm = 0,
|
||||
.encryptKey = {0}};
|
||||
|
||||
#if defined(TD_ENTERPRISE)
|
||||
if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_MNODE_WAL) == DND_CS_MNODE_WAL){
|
||||
cfg.encryptAlgorithm = (tsiEncryptScope & DND_CS_MNODE_WAL)? tsiEncryptAlgorithm : 0;
|
||||
if(tsEncryptKey[0] == '\0'){
|
||||
if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_MNODE_WAL) == DND_CS_MNODE_WAL) {
|
||||
cfg.encryptAlgorithm = (tsiEncryptScope & DND_CS_MNODE_WAL) ? tsiEncryptAlgorithm : 0;
|
||||
if (tsEncryptKey[0] == '\0') {
|
||||
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||
return -1;
|
||||
}
|
||||
else{
|
||||
} else {
|
||||
strncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
}
|
||||
}
|
||||
|
@ -856,7 +853,7 @@ _OVER:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo) {
|
||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo *pQueueInfo) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -146,6 +146,15 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
|
|||
return memOfItems;
|
||||
}
|
||||
|
||||
int32_t taosAllocateQitemWrapper(int32_t size, EQItype itype, int64_t dataSize, void **pItem) {
|
||||
int32_t code = 0;
|
||||
*pItem = taosAllocateQitem(size, itype, dataSize);
|
||||
if (*pItem == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) {
|
||||
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
|
||||
if (pNode == NULL) {
|
||||
|
@ -533,9 +542,7 @@ int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfI
|
|||
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
|
||||
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
|
||||
|
||||
void taosQueueSetThreadId(STaosQueue* pQueue, int64_t threadId) {
|
||||
pQueue->threadId = threadId;
|
||||
}
|
||||
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; }
|
||||
|
||||
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
|
||||
|
||||
|
|
|
@ -83,15 +83,13 @@ int32_t taosCheckVersionCompatible(int32_t clientVer, int32_t serverVer, int32_t
|
|||
serverVer /= 1000000;
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_INVALID_VERSION_NUMBER;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_VERSION_NUMBER;
|
||||
}
|
||||
|
||||
if (clientVer == serverVer) {
|
||||
return 0;
|
||||
} else {
|
||||
terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
|
||||
return -1;
|
||||
return TSDB_CODE_VERSION_NOT_COMPATIBLE;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue