minor changes
This commit is contained in:
parent
41290f9a82
commit
d84f44c149
|
@ -233,11 +233,11 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) //"Missing data file")
|
||||
#define TSDB_CODE_VND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0508) //"Out of memory")
|
||||
#define TSDB_CODE_VND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0509) //"Unexpected generic error in vnode")
|
||||
#define TSDB_CODE_VND_INVALID_VRESION_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid version file")
|
||||
#define TSDB_CODE_VND_IS_FULL TAOS_DEF_ERROR_CODE(0, 0x050B) //"Database memory is full for commit failed")
|
||||
#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full for waiting commit")
|
||||
#define TSDB_CODE_VND_INVALID_CFG_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid config file)
|
||||
#define TSDB_CODE_VND_INVALID_TERM_FILE TAOS_DEF_ERROR_CODE(0, 0x050B) //"Invalid term file")
|
||||
#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full")
|
||||
#define TSDB_CODE_VND_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x050D) //"Database is dropping")
|
||||
#define TSDB_CODE_VND_IS_BALANCING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is balancing")
|
||||
#define TSDB_CODE_VND_IS_UPDATING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is updating")
|
||||
#define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0510) //"Database is closing")
|
||||
#define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended")
|
||||
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
|
||||
|
|
|
@ -77,8 +77,6 @@ typedef struct SVnodeCfg {
|
|||
SSyncCfg sync;
|
||||
} SVnodeCfg;
|
||||
|
||||
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId; // global vnode group ID
|
||||
int32_t refCount; // reference count
|
||||
|
|
|
@ -280,90 +280,77 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState){
|
||||
#if 0
|
||||
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) {
|
||||
int32_t ret = TSDB_CODE_VND_APP_ERROR;
|
||||
int32_t len = 0;
|
||||
int32_t maxLen = 100;
|
||||
char * content = calloc(1, maxLen + 1);
|
||||
cJSON * root = NULL;
|
||||
FILE * fp = NULL;
|
||||
char *content = calloc(1, maxLen + 1);
|
||||
cJSON *root = NULL;
|
||||
FILE *fp = NULL;
|
||||
|
||||
terrno = TSDB_CODE_VND_INVALID_VRESION_FILE;
|
||||
char file[TSDB_FILENAME_LEN + 30] = {0};
|
||||
sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
|
||||
|
||||
fp = fopen(file, "r");
|
||||
if (!fp) {
|
||||
if (errno != ENOENT) {
|
||||
vError("vgId:%d, failed to read %s, error:%s", pVnode->vgId, file, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
} else {
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
goto PARSE_VER_ERROR;
|
||||
}
|
||||
char file[PATH_MAX + 30] = {0};
|
||||
sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId);
|
||||
|
||||
len = (int32_t)fread(content, 1, maxLen, fp);
|
||||
if (len <= 0) {
|
||||
vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file);
|
||||
goto PARSE_VER_ERROR;
|
||||
vError("vgId:%d, failed to read %s since content is null", vgId, file);
|
||||
goto PARSE_TERM_ERROR;
|
||||
}
|
||||
|
||||
root = cJSON_Parse(content);
|
||||
if (root == NULL) {
|
||||
vError("vgId:%d, failed to read %s, invalid json format", pVnode->vgId, file);
|
||||
goto PARSE_VER_ERROR;
|
||||
vError("vgId:%d, failed to read %s since invalid json format", vgId, file);
|
||||
goto PARSE_TERM_ERROR;
|
||||
}
|
||||
|
||||
cJSON *ver = cJSON_GetObjectItem(root, "version");
|
||||
if (!ver || ver->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file);
|
||||
goto PARSE_VER_ERROR;
|
||||
cJSON *term = cJSON_GetObjectItem(root, "term");
|
||||
if (!term || term->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read %s since term not found", vgId, file);
|
||||
goto PARSE_TERM_ERROR;
|
||||
}
|
||||
#if 0
|
||||
pVnode->version = (uint64_t)ver->valueint;
|
||||
pState->term = (uint64_t)term->valueint;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version);
|
||||
#endif
|
||||
cJSON *voteFor = cJSON_GetObjectItem(root, "voteFor");
|
||||
if (!voteFor || voteFor->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read %s since voteFor not found", vgId, file);
|
||||
goto PARSE_TERM_ERROR;
|
||||
}
|
||||
pState->voteFor = (int64_t)voteFor->valueint;
|
||||
|
||||
PARSE_VER_ERROR:
|
||||
vInfo("vgId:%d, read %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term);
|
||||
|
||||
PARSE_TERM_ERROR:
|
||||
if (content != NULL) free(content);
|
||||
if (root != NULL) cJSON_Delete(root);
|
||||
if (fp != NULL) fclose(fp);
|
||||
|
||||
return terrno;
|
||||
#endif
|
||||
return 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState) {
|
||||
#if 0
|
||||
char file[TSDB_FILENAME_LEN + 30] = {0};
|
||||
sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
|
||||
int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) {
|
||||
char file[PATH_MAX + 30] = {0};
|
||||
sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId);
|
||||
|
||||
FILE *fp = fopen(file, "w");
|
||||
if (!fp) {
|
||||
vError("vgId:%d, failed to write %s, reason:%s", pVnode->vgId, file, strerror(errno));
|
||||
vError("vgId:%d, failed to write %s since %s", vgId, file, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t maxLen = 100;
|
||||
char * content = calloc(1, maxLen + 1);
|
||||
char *content = calloc(1, maxLen + 1);
|
||||
|
||||
#if 0
|
||||
len += snprintf(content + len, maxLen - len, "{\n");
|
||||
len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion);
|
||||
len += snprintf(content + len, maxLen - len, " \"term\": %" PRIu64 "\n", pState->term);
|
||||
len += snprintf(content + len, maxLen - len, " \"voteFor\": %" PRIu64 "\n", pState->voteFor);
|
||||
len += snprintf(content + len, maxLen - len, "}\n");
|
||||
#endif
|
||||
|
||||
fwrite(content, 1, len, fp);
|
||||
taosFsyncFile(fileno(fp));
|
||||
fclose(fp);
|
||||
free(content);
|
||||
terrno = 0;
|
||||
|
||||
// vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion);
|
||||
#endif
|
||||
vInfo("vgId:%d, write %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
|
@ -24,6 +24,7 @@
|
|||
static struct {
|
||||
SSteps *steps;
|
||||
SVnodeFp fp;
|
||||
void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
|
||||
} tsVint;
|
||||
|
||||
void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) {
|
||||
|
@ -36,7 +37,41 @@ void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) {
|
|||
|
||||
void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsVint.fp.SendMsgToMnode)(rpcMsg); }
|
||||
|
||||
void vnodeProcessMsg(SRpcMsg *pMsg) {
|
||||
if (tsVint.msgFp[pMsg->msgType]) {
|
||||
(*tsVint.msgFp[pMsg->msgType])(pMsg);
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
static void vnodeInitMsgFp() {
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg;
|
||||
// mq related
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
|
||||
// mq related end
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
|
||||
tsVint.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
|
||||
}
|
||||
|
||||
int32_t vnodeInit(SVnodePara para) {
|
||||
vnodeInitMsgFp();
|
||||
tsVint.fp = para.fp;
|
||||
|
||||
struct SSteps *steps = taosStepInit(8, NULL);
|
||||
|
|
|
@ -44,7 +44,6 @@ static struct {
|
|||
SHashObj *hash;
|
||||
int32_t openVnodes;
|
||||
int32_t totalVnodes;
|
||||
void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
|
||||
} tsVnode;
|
||||
|
||||
static bool vnodeSetInitStatus(SVnode *pVnode) {
|
||||
|
@ -566,34 +565,7 @@ void vnodeRelease(SVnode *pVnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static void vnodeInitMsgFp() {
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg;
|
||||
// mq related
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
|
||||
// mq related end
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
|
||||
tsVnode.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
|
||||
}
|
||||
|
||||
int32_t vnodeInitMain() {
|
||||
vnodeInitMsgFp();
|
||||
|
||||
tsVnode.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (tsVnode.hash == NULL) {
|
||||
vError("failed to init vnode mgmt");
|
||||
|
@ -654,11 +626,3 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
void vnodeProcessMsg(SRpcMsg *pMsg) {
|
||||
if (tsVnode.msgFp[pMsg->msgType]) {
|
||||
(*tsVnode.msgFp[pMsg->msgType])(pMsg);
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tqueue.h"
|
||||
#include "tworker.h"
|
||||
#include "vnodeMain.h"
|
||||
#include "vnodeWrite.h"
|
||||
#include "vnodeWriteMsg.h"
|
||||
|
|
|
@ -245,11 +245,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, "No write permission f
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, "Missing data file")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, "Out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, "Unexpected generic error in vnode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, "Invalid version file")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, "Database memory is full for commit failed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full for waiting commit")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_CFG_FILE, "Invalid config file")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TERM_FILE, "Invalid term file")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, "Database is dropping")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_BALANCING, "Database is balancing")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_UPDATING, "Database is updating")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_CLOSING, "Database is closing")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
|
||||
|
|
Loading…
Reference in New Issue