commit
c684094f7a
|
@ -3,6 +3,8 @@ PROJECT(TDengine)
|
||||||
|
|
||||||
SET(TD_CLUSTER FALSE)
|
SET(TD_CLUSTER FALSE)
|
||||||
SET(TD_ACCOUNT FALSE)
|
SET(TD_ACCOUNT FALSE)
|
||||||
|
SET(TD_VPEER FALSE)
|
||||||
|
SET(TD_MPEER FALSE)
|
||||||
SET(TD_GRANT FALSE)
|
SET(TD_GRANT FALSE)
|
||||||
SET(TD_COVER FALSE)
|
SET(TD_COVER FALSE)
|
||||||
SET(TD_PAGMODE_LITE FALSE)
|
SET(TD_PAGMODE_LITE FALSE)
|
||||||
|
|
|
@ -1835,7 +1835,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
|
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
|
||||||
pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode);
|
pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchema* pSchema = pMetaMsg->schema;
|
SSchema* pSchema = pMetaMsg->schema;
|
||||||
|
|
|
@ -1454,7 +1454,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
||||||
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
|
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
|
||||||
if (pNew == NULL) {
|
if (pNew == NULL) {
|
||||||
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
|
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
|
||||||
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
|
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex);
|
||||||
|
|
||||||
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
|
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
|
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
|
||||||
|
@ -1470,7 +1470,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
||||||
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
|
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
|
||||||
if (vnodeInfo != NULL) {
|
if (vnodeInfo != NULL) {
|
||||||
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
|
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
|
||||||
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
|
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
|
||||||
trsupport->subqueryIndex, pState->code);
|
trsupport->subqueryIndex, pState->code);
|
||||||
} else {
|
} else {
|
||||||
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
|
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
|
||||||
|
@ -1481,7 +1481,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
||||||
} else { // success, proceed to retrieve data from dnode
|
} else { // success, proceed to retrieve data from dnode
|
||||||
if (vnodeInfo != NULL) {
|
if (vnodeInfo != NULL) {
|
||||||
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
||||||
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
|
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
|
||||||
trsupport->subqueryIndex);
|
trsupport->subqueryIndex);
|
||||||
} else {
|
} else {
|
||||||
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
||||||
|
|
|
@ -26,6 +26,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
IF (TD_CLUSTER)
|
IF (TD_CLUSTER)
|
||||||
TARGET_LINK_LIBRARIES(taosd cluster)
|
TARGET_LINK_LIBRARIES(taosd cluster)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
|
IF (TD_VPEER)
|
||||||
|
TARGET_LINK_LIBRARIES(taosd balance)
|
||||||
|
ENDIF ()
|
||||||
|
|
||||||
SET(PREPARE_ENV_CMD "prepare_env_cmd")
|
SET(PREPARE_ENV_CMD "prepare_env_cmd")
|
||||||
SET(PREPARE_ENV_TARGET "prepare_env_target")
|
SET(PREPARE_ENV_TARGET "prepare_env_target")
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
|
|
||||||
static int32_t dnodeInitSystem();
|
static int32_t dnodeInitSystem();
|
||||||
static int32_t dnodeInitStorage();
|
static int32_t dnodeInitStorage();
|
||||||
|
extern void grantParseParameter();
|
||||||
static void dnodeCleanupStorage();
|
static void dnodeCleanupStorage();
|
||||||
static void dnodeCleanUpSystem();
|
static void dnodeCleanUpSystem();
|
||||||
static void dnodeSetRunStatus(SDnodeRunStatus status);
|
static void dnodeSetRunStatus(SDnodeRunStatus status);
|
||||||
|
|
|
@ -64,7 +64,7 @@ int32_t dnodeInitMgmt() {
|
||||||
dError("failed to init dnode timer");
|
dError("failed to init dnode timer");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = dnodeOpenVnodes();
|
int32_t code = dnodeOpenVnodes();
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -104,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) {
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int dnodeGetVnodeList(int32_t vnodeList[]) {
|
static int32_t dnodeGetVnodeList(int32_t vnodeList[]) {
|
||||||
DIR *dir = opendir(tsVnodeDir);
|
DIR *dir = opendir(tsVnodeDir);
|
||||||
if (dir == NULL) {
|
if (dir == NULL) {
|
||||||
return TSDB_CODE_NO_WRITE_ACCESS;
|
return TSDB_CODE_NO_WRITE_ACCESS;
|
||||||
|
@ -129,47 +129,59 @@ static int dnodeGetVnodeList(int32_t vnodeList[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dnodeOpenVnodes() {
|
static int32_t dnodeOpenVnodes() {
|
||||||
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
||||||
int failed = 0;
|
int32_t failed = 0;
|
||||||
|
|
||||||
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
|
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000);
|
||||||
int numOfVnodes = dnodeGetVnodeList(vnodeList);
|
int32_t numOfVnodes = dnodeGetVnodeList(vnodeList);
|
||||||
|
|
||||||
for (int i=0; i<numOfVnodes; ++i) {
|
|
||||||
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
|
|
||||||
if (vnodeOpen(vnodeList[i], vnodeDir) <0) failed++;
|
|
||||||
}
|
|
||||||
|
|
||||||
free(vnodeList);
|
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||||
|
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
|
||||||
|
if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++;
|
||||||
|
}
|
||||||
|
|
||||||
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
|
free(vnodeList);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
|
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeCloseVnodes() {
|
static void dnodeCloseVnodes() {
|
||||||
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
|
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000);
|
||||||
int numOfVnodes = dnodeGetVnodeList(vnodeList);
|
int32_t numOfVnodes = dnodeGetVnodeList(vnodeList);
|
||||||
|
|
||||||
for (int i=0; i<numOfVnodes; ++i)
|
|
||||||
vnodeClose(vnodeList[i]);
|
|
||||||
|
|
||||||
free(vnodeList);
|
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||||
dPrint("total vnodes:%d are all closed", numOfVnodes);
|
vnodeClose(vnodeList[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(vnodeList);
|
||||||
|
dPrint("total vnodes:%d are all closed", numOfVnodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
|
|
||||||
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||||
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
|
||||||
pCreate->cfg.commitLog = pCreate->cfg.commitLog;
|
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
||||||
|
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
|
||||||
|
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
|
||||||
|
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
|
||||||
|
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
|
||||||
|
pCreate->cfg.rowsInFileBlock = htonl(pCreate->cfg.rowsInFileBlock);
|
||||||
|
pCreate->cfg.blocksPerTable = htons(pCreate->cfg.blocksPerTable);
|
||||||
|
pCreate->cfg.cacheNumOfBlocks.totalBlocks = htonl(pCreate->cfg.cacheNumOfBlocks.totalBlocks);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
|
||||||
|
pCreate->vpeerDesc[j].vgId = htonl(pCreate->vpeerDesc[j].vgId);
|
||||||
|
pCreate->vpeerDesc[j].dnodeId = htonl(pCreate->vpeerDesc[j].dnodeId);
|
||||||
|
pCreate->vpeerDesc[j].ip = htonl(pCreate->vpeerDesc[j].ip);
|
||||||
|
}
|
||||||
|
|
||||||
return vnodeCreate(pCreate);
|
return vnodeCreate(pCreate);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
|
|
||||||
SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
|
SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
|
||||||
pDrop->vgId = htonl(pDrop->vgId);
|
pDrop->vgId = htonl(pDrop->vgId);
|
||||||
|
|
||||||
|
@ -177,7 +189,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
|
|
||||||
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||||
|
@ -206,7 +217,7 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
|
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
|
||||||
if (tsDnodeTmr == NULL) {
|
if (tsDnodeTmr == NULL) {
|
||||||
dError("dnode timer is already released");
|
dError("dnode timer is already released");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -192,6 +192,7 @@ static void dnodeHandleIdleReadWorker() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UNUSED_FUNC
|
||||||
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
|
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
|
||||||
SRpcContext *pRpcContext = pRead->pRpcContext;
|
SRpcContext *pRpcContext = pRead->pRpcContext;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
|
@ -37,7 +37,7 @@ int32_t dnodeInitShell() {
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
|
||||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead;
|
||||||
|
|
||||||
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
|
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
|
||||||
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
|
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
|
||||||
if (numOfThreads < 1) {
|
if (numOfThreads < 1) {
|
||||||
numOfThreads = 1;
|
numOfThreads = 1;
|
||||||
|
|
|
@ -52,7 +52,6 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
|
||||||
SWriteWorkerPool wWorkerPool;
|
SWriteWorkerPool wWorkerPool;
|
||||||
|
|
||||||
int32_t dnodeInitWrite() {
|
int32_t dnodeInitWrite() {
|
||||||
|
|
||||||
wWorkerPool.max = tsNumOfCores;
|
wWorkerPool.max = tsNumOfCores;
|
||||||
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
|
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
|
||||||
if (wWorkerPool.writeWorker == NULL) return -1;
|
if (wWorkerPool.writeWorker == NULL) return -1;
|
||||||
|
@ -71,7 +70,7 @@ void dnodeCleanupWrite() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeWrite(SRpcMsg *pMsg) {
|
void dnodeWrite(SRpcMsg *pMsg) {
|
||||||
char *pCont = (char *) pMsg->pCont;
|
char *pCont = (char *)pMsg->pCont;
|
||||||
|
|
||||||
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
|
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
|
||||||
SMsgDesc *pDesc = (SMsgDesc *)pCont;
|
SMsgDesc *pDesc = (SMsgDesc *)pCont;
|
||||||
|
@ -80,16 +79,16 @@ void dnodeWrite(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgHead *pHead = (SMsgHead *) pCont;
|
SMsgHead *pHead = (SMsgHead *) pCont;
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
|
|
||||||
taos_queue queue = vnodeGetWqueue(pHead->vgId);
|
taos_queue queue = vnodeGetWqueue(pHead->vgId);
|
||||||
if (queue) {
|
if (queue) {
|
||||||
// put message into queue
|
// put message into queue
|
||||||
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
|
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
|
||||||
pWrite->rpcMsg = *pMsg;
|
pWrite->rpcMsg = *pMsg;
|
||||||
pWrite->pCont = pCont;
|
pWrite->pCont = pCont;
|
||||||
pWrite->contLen = pHead->contLen;
|
pWrite->contLen = pHead->contLen;
|
||||||
|
|
||||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
|
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
|
||||||
} else {
|
} else {
|
||||||
|
@ -227,4 +226,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -98,7 +98,6 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
int32_t vnode;
|
|
||||||
uint32_t privateIp;
|
uint32_t privateIp;
|
||||||
uint32_t publicIp;
|
uint32_t publicIp;
|
||||||
} SVnodeGid;
|
} SVnodeGid;
|
||||||
|
|
|
@ -156,28 +156,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 110, "query cancelled
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value")
|
||||||
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_SYNC_REQUIRED, 0, 99, "sync required")
|
// others
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_UNSYNCED, 0, 100, "unsyned")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 120, "invalid file format")
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_ALREADY_IMPORTED, 0, 75, "data already imported")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_COMMIT_LOG, 0, 109, "invalid commit log") // commit log init failed
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode status")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_TIMESTAMP_OUT_OF_RANGE, 0, 105, "timestamp out of range")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_DUPLICATE_TAGS, 0, 112, "duplicate tags") // tags value for join not unique
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SUBMIT_MSG, 0, 113, "invalid submit message")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_FILE_BLOCK_TS_DISORDERED, 0, 108, "file block ts disordered") // time stamp in file block is disordered
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_BATCH_SIZE_TOO_BIG, 0, 104, "batch size too big")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_WRONG_SCHEMA, 0, 53, "wrong schema")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_NO_QSUMMARY, 0, 68, "no qsummery")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_METER_ID, 0, 27, "invalid meter id")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_METRICMETA_EXPIRED, 0, 63, "metricmeta expired") // local cached metric-meta expired causes error in metric query
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_ALREADY_EXIST, 0, 67, "session already exist")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_NOT_READY, 0, 103, "session not ready") // table NOT in ready state
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_OVERFLOW, 0, 82, "data overflow")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_TRANS_NOT_FINISHED, 0, 17, "action transaction not finished")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NOT_ONLINE, 0, 18, "action not online")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_SEND_FAILD, 0, 19, "action send failed")
|
|
||||||
// TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_SESSION, 0, 20, "not active session")
|
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
};
|
};
|
||||||
|
|
|
@ -241,7 +241,8 @@ typedef struct SSchema {
|
||||||
} SSchema;
|
} SSchema;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vnode; // the index of vnode
|
int32_t vgId;
|
||||||
|
int32_t dnodeId;
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
} SVnodeDesc;
|
} SVnodeDesc;
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ extern "C" {
|
||||||
|
|
||||||
int32_t mgmtInitBalance();
|
int32_t mgmtInitBalance();
|
||||||
void mgmtCleanupBalance();
|
void mgmtCleanupBalance();
|
||||||
void mgmtStartBalanceTimer(int32_t afterMs) ;
|
void mgmtBalanceNotify() ;
|
||||||
int32_t mgmtAllocVnodes(SVgObj *pVgroup);
|
int32_t mgmtAllocVnodes(SVgObj *pVgroup);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -18,11 +18,35 @@
|
||||||
#include "mgmtBalance.h"
|
#include "mgmtBalance.h"
|
||||||
#include "mgmtDnode.h"
|
#include "mgmtDnode.h"
|
||||||
|
|
||||||
int32_t mgmtInitBalance() { return 0; }
|
extern int32_t balanceInit();
|
||||||
void mgmtCleanupBalance() {}
|
extern void balanceCleanUp();
|
||||||
void mgmtStartBalanceTimer(int32_t afterMs) {}
|
extern void balanceNotify();
|
||||||
|
extern int32_t balanceAllocVnodes(SVgObj *pVgroup);
|
||||||
|
|
||||||
|
int32_t mgmtInitBalance() {
|
||||||
|
#ifdef _VPEER
|
||||||
|
return balanceInit();
|
||||||
|
#else
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
void mgmtCleanupBalance() {
|
||||||
|
#ifdef _VPEER
|
||||||
|
balanceCleanUp();
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
void mgmtBalanceNotify() {
|
||||||
|
#ifdef _VPEER
|
||||||
|
balanceNotify();
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
|
int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
|
||||||
|
#ifdef _VPEER
|
||||||
|
return balanceAllocVnodes(pVgroup);
|
||||||
|
#else
|
||||||
void * pNode = NULL;
|
void * pNode = NULL;
|
||||||
SDnodeObj *pDnode = NULL;
|
SDnodeObj *pDnode = NULL;
|
||||||
SDnodeObj *pSelDnode = NULL;
|
SDnodeObj *pSelDnode = NULL;
|
||||||
|
@ -53,4 +77,5 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
|
||||||
|
|
||||||
mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
|
mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
|
@ -251,7 +251,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
|
||||||
if (pDnode->status != TSDB_DN_STATUS_READY) {
|
if (pDnode->status != TSDB_DN_STATUS_READY) {
|
||||||
mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
|
mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
|
||||||
pDnode->status = TSDB_DN_STATUS_READY;
|
pDnode->status = TSDB_DN_STATUS_READY;
|
||||||
mgmtStartBalanceTimer(200);
|
mgmtBalanceNotify();
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtDecDnodeRef(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
|
|
@ -1512,7 +1512,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
|
||||||
} else {
|
} else {
|
||||||
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp;
|
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp;
|
||||||
}
|
}
|
||||||
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
|
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
|
||||||
|
pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId);
|
||||||
}
|
}
|
||||||
pMeta->numOfVpeers = pVgroup->numOfVnodes;
|
pMeta->numOfVpeers = pVgroup->numOfVnodes;
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
#include "mgmtVgroup.h"
|
#include "mgmtVgroup.h"
|
||||||
|
|
||||||
void *tsVgroupSdb = NULL;
|
void *tsVgroupSdb = NULL;
|
||||||
static int32_t tsVgUpdateSize = 0;
|
int32_t tsVgUpdateSize = 0;
|
||||||
|
|
||||||
static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||||
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||||
|
@ -93,11 +93,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
|
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
|
||||||
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
|
if (pDnode != NULL) {
|
||||||
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
|
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
|
||||||
pVgroup->vnodeGid[i].vnode = pVgroup->vgId;
|
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
|
||||||
atomic_add_fetch_32(&pDnode->openVnodes, 1);
|
atomic_add_fetch_32(&pDnode->openVnodes, 1);
|
||||||
mgmtDecDnodeRef(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtAddVgroupIntoDb(pVgroup);
|
mgmtAddVgroupIntoDb(pVgroup);
|
||||||
|
@ -236,7 +237,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
|
||||||
|
|
||||||
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode);
|
mPrint("vgroup:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
pMsg->ahandle = pVgroup;
|
pMsg->ahandle = pVgroup;
|
||||||
|
@ -312,27 +313,21 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < maxReplica; ++i) {
|
for (int32_t i = 0; i < maxReplica; ++i) {
|
||||||
|
pShow->bytes[cols] = 2;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
strcpy(pSchema[cols].name, "dnode");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
pShow->bytes[cols] = 16;
|
pShow->bytes[cols] = 16;
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
strcpy(pSchema[cols].name, "ip");
|
strcpy(pSchema[cols].name, "ip");
|
||||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "vnode");
|
|
||||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 9;
|
pShow->bytes[cols] = 9;
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
strcpy(pSchema[cols].name, "vnode status");
|
strcpy(pSchema[cols].name, "vstatus");
|
||||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 16;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "public ip");
|
|
||||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
cols++;
|
cols++;
|
||||||
}
|
}
|
||||||
|
@ -416,15 +411,15 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
for (int32_t i = 0; i < maxReplica; ++i) {
|
for (int32_t i = 0; i < maxReplica; ++i) {
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId;
|
||||||
|
cols++;
|
||||||
|
|
||||||
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
|
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
strcpy(pWrite, ipstr);
|
strcpy(pWrite, ipstr);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
if (pVgroup->vnodeGid[i].dnodeId != 0) {
|
if (pVgroup->vnodeGid[i].dnodeId != 0) {
|
||||||
char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
|
char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
|
||||||
|
@ -433,11 +428,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
|
||||||
strcpy(pWrite, "null");
|
strcpy(pWrite, "null");
|
||||||
}
|
}
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
strcpy(pWrite, ipstr);
|
|
||||||
cols++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
|
@ -490,15 +480,15 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
||||||
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
|
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
|
||||||
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
|
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
|
||||||
pCfg->commitTime = htonl(pCfg->commitTime);
|
pCfg->commitTime = htonl(pCfg->commitTime);
|
||||||
pCfg->commitLog = pCfg->commitLog;
|
|
||||||
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
|
|
||||||
pCfg->replications = (char) pVgroup->numOfVnodes;
|
|
||||||
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
|
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
|
||||||
|
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
|
||||||
|
pCfg->replications = (int8_t) pVgroup->numOfVnodes;
|
||||||
|
|
||||||
SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
|
SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
|
||||||
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
|
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
|
||||||
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
|
vpeerDesc[j].vgId = htonl(pVgroup->vgId);
|
||||||
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
|
vpeerDesc[j].dnodeId = htonl(pVgroup->vnodeGid[j].dnodeId);
|
||||||
|
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pVnode;
|
return pVnode;
|
||||||
|
|
|
@ -41,6 +41,8 @@ typedef struct {
|
||||||
void *sync;
|
void *sync;
|
||||||
void *events;
|
void *events;
|
||||||
void *cq; // continuous query
|
void *cq; // continuous query
|
||||||
|
int32_t replicas;
|
||||||
|
SVnodeDesc vpeers[TSDB_MAX_MPEERS];
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
|
||||||
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
|
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
|
||||||
|
|
|
@ -29,16 +29,17 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static void *tsDnodeVnodesHash;
|
static void *tsDnodeVnodesHash;
|
||||||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||||
static void vnodeBuildVloadMsg(char *pNode, void * param);
|
static void vnodeBuildVloadMsg(char *pNode, void * param);
|
||||||
static int vnodeWALCallback(void *arg);
|
static int vnodeWALCallback(void *arg);
|
||||||
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
|
||||||
|
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
|
||||||
|
|
||||||
static int tsOpennedVnodes;
|
static int tsOpennedVnodes;
|
||||||
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
static void vnodeInit() {
|
static void vnodeInit() {
|
||||||
|
|
||||||
vnodeInitWriteFp();
|
vnodeInitWriteFp();
|
||||||
|
|
||||||
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt);
|
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt);
|
||||||
|
@ -51,12 +52,12 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
pthread_once(&vnodeModuleInit, vnodeInit);
|
pthread_once(&vnodeModuleInit, vnodeInit);
|
||||||
|
|
||||||
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
|
SVnodeObj *pTemp = (SVnodeObj *)taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
|
||||||
|
|
||||||
if (pTemp != NULL) {
|
if (pTemp != NULL) {
|
||||||
dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
|
dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbCfg tsdbCfg = {0};
|
STsdbCfg tsdbCfg = {0};
|
||||||
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
||||||
|
@ -81,12 +82,18 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = vnodeSaveCfg(pVnodeCfg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
||||||
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||||
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
|
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
|
||||||
if (code <0) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
|
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
|
dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
|
||||||
|
@ -121,6 +128,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pVnode->version = 0;
|
pVnode->version = 0;
|
||||||
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
|
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
|
||||||
|
|
||||||
|
int32_t code = vnodeReadCfg(pVnode);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId);
|
||||||
|
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
||||||
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
||||||
|
|
||||||
|
@ -258,7 +272,63 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: this is a simple implement
|
||||||
static int vnodeWALCallback(void *arg) {
|
static int vnodeWALCallback(void *arg) {
|
||||||
SVnodeObj *pVnode = arg;
|
SVnodeObj *pVnode = arg;
|
||||||
return walRenew(pVnode->wal);
|
return walRenew(pVnode->wal);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
|
||||||
|
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||||
|
|
||||||
|
FILE *fp = fopen(cfgFile, "w");
|
||||||
|
if (!fp) return errno;
|
||||||
|
|
||||||
|
fprintf(fp, "replicas %d\n", pVnodeCfg->cfg.replications);
|
||||||
|
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
|
||||||
|
fprintf(fp, "index%d dnode %d ip %u\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip);
|
||||||
|
}
|
||||||
|
|
||||||
|
fclose(fp);
|
||||||
|
dTrace("vgId:%d, save vnode cfg successed", pVnodeCfg, pVnodeCfg->cfg.vgId);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: this is a simple implement
|
||||||
|
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||||
|
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
|
||||||
|
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId);
|
||||||
|
|
||||||
|
FILE *fp = fopen(cfgFile, "r");
|
||||||
|
if (!fp) return errno;
|
||||||
|
|
||||||
|
char option[3][32] = {0};
|
||||||
|
int32_t replicas = 0;
|
||||||
|
int32_t num = fscanf(fp, "%s %d", option[0], &replicas);
|
||||||
|
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||||
|
if (strcmp(option[0], "replicas") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||||
|
if (replicas == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||||
|
pVnode->replicas = replicas;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < replicas; ++i) {
|
||||||
|
int32_t dnodeId = 0;
|
||||||
|
uint32_t dnodeIp = 0;
|
||||||
|
num = fscanf(fp, "%s %s %d %s %u", option[0], option[1], &dnodeId, option[2], &dnodeIp);
|
||||||
|
if (num != 5) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||||
|
if (strcmp(option[1], "dnode") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||||
|
if (strcmp(option[2], "ip") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||||
|
if (dnodeId == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||||
|
if (dnodeIp == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||||
|
|
||||||
|
pVnode->vpeers[i].dnodeId = dnodeId;
|
||||||
|
pVnode->vpeers[i].ip = dnodeIp;
|
||||||
|
pVnode->vpeers[i].vgId = pVnode->vgId;
|
||||||
|
}
|
||||||
|
|
||||||
|
fclose(fp);
|
||||||
|
dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
Loading…
Reference in New Issue