Merge remote-tracking branch 'haoyifan/fix2' into develop
This commit is contained in:
commit
20e7786123
|
@ -84,7 +84,7 @@ static void dnodeAllocModules() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeCleanupModules() {
|
void dnodeCleanupModules() {
|
||||||
for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) {
|
for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) {
|
||||||
if (tsModule[module].enable && tsModule[module].stopFp) {
|
if (tsModule[module].enable && tsModule[module].stopFp) {
|
||||||
(*tsModule[module].stopFp)();
|
(*tsModule[module].stopFp)();
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,9 +32,9 @@
|
||||||
typedef struct {
|
typedef struct {
|
||||||
taos_qall qall;
|
taos_qall qall;
|
||||||
taos_qset qset; // queue set
|
taos_qset qset; // queue set
|
||||||
pthread_t thread; // thread
|
pthread_t thread; // thread
|
||||||
int32_t workerId; // worker ID
|
int32_t workerId; // worker ID
|
||||||
} SWriteWorker;
|
} SWriteWorker;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRspRet rspRet;
|
SRspRet rspRet;
|
||||||
|
@ -136,17 +136,24 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
|
||||||
|
|
||||||
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
||||||
pWorker->qall = taosAllocateQall();
|
pWorker->qall = taosAllocateQall();
|
||||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
if (pWorker->qall == NULL) {
|
||||||
|
taosCloseQset(pWorker->qset);
|
||||||
|
taosCloseQueue(queue);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
pthread_attr_t thAttr;
|
pthread_attr_t thAttr;
|
||||||
pthread_attr_init(&thAttr);
|
pthread_attr_init(&thAttr);
|
||||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
|
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
|
||||||
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||||
|
taosFreeQall(pWorker->qall);
|
||||||
taosCloseQset(pWorker->qset);
|
taosCloseQset(pWorker->qset);
|
||||||
|
taosCloseQueue(queue);
|
||||||
|
queue = NULL;
|
||||||
} else {
|
} else {
|
||||||
dTrace("write worker:%d is launched", pWorker->workerId);
|
dTrace("write worker:%d is launched", pWorker->workerId);
|
||||||
|
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_attr_destroy(&thAttr);
|
pthread_attr_destroy(&thAttr);
|
||||||
|
@ -195,7 +202,7 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
|
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
|
||||||
if (numOfMsgs ==0) {
|
if (numOfMsgs == 0) {
|
||||||
dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
|
dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -243,7 +250,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||||
|
|
||||||
if (num > 0) {
|
if (num > 0) {
|
||||||
usleep(30000);
|
usleep(30000);
|
||||||
sched_yield();
|
sched_yield();
|
||||||
} else {
|
} else {
|
||||||
taosFreeQall(pWorker->qall);
|
taosFreeQall(pWorker->qall);
|
||||||
taosCloseQset(pWorker->qset);
|
taosCloseQset(pWorker->qset);
|
||||||
|
|
|
@ -108,7 +108,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
|
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
|
||||||
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
||||||
tsdbCfg.compression = pVnodeCfg->cfg.compression;;
|
tsdbCfg.compression = pVnodeCfg->cfg.compression;;
|
||||||
|
|
||||||
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
||||||
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||||
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
|
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
|
||||||
|
@ -139,7 +139,7 @@ int32_t vnodeDrop(int32_t vgId) {
|
||||||
vTrace("vgId:%d, vnode will be dropped", pVnode->vgId);
|
vTrace("vgId:%d, vnode will be dropped", pVnode->vgId);
|
||||||
pVnode->status = TAOS_VN_STATUS_DELETING;
|
pVnode->status = TAOS_VN_STATUS_DELETING;
|
||||||
vnodeCleanUp(pVnode);
|
vnodeCleanUp(pVnode);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,7 +262,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// start continuous query
|
// start continuous query
|
||||||
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
||||||
cqStart(pVnode->cq);
|
cqStart(pVnode->cq);
|
||||||
|
|
||||||
pVnode->events = NULL;
|
pVnode->events = NULL;
|
||||||
|
@ -342,7 +342,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeGetRqueue(void *pVnode) {
|
void *vnodeGetRqueue(void *pVnode) {
|
||||||
return ((SVnodeObj *)pVnode)->rqueue;
|
return ((SVnodeObj *)pVnode)->rqueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeGetWqueue(int32_t vgId) {
|
void *vnodeGetWqueue(int32_t vgId) {
|
||||||
|
@ -352,7 +352,7 @@ void *vnodeGetWqueue(int32_t vgId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeGetWal(void *pVnode) {
|
void *vnodeGetWal(void *pVnode) {
|
||||||
return ((SVnodeObj *)pVnode)->wal;
|
return ((SVnodeObj *)pVnode)->wal;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
|
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
|
||||||
|
@ -447,9 +447,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
||||||
vPrint("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role);
|
vPrint("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role);
|
||||||
pVnode->role = role;
|
pVnode->role = role;
|
||||||
|
|
||||||
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
||||||
cqStart(pVnode->cq);
|
cqStart(pVnode->cq);
|
||||||
else
|
else
|
||||||
cqStop(pVnode->cq);
|
cqStop(pVnode->cq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -501,14 +501,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
|
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
|
||||||
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
|
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
|
||||||
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
|
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
|
||||||
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
|
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
|
||||||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
|
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
|
||||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
|
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
|
||||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
|
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
|
||||||
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
|
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
|
||||||
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
|
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
|
||||||
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
|
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
|
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
|
||||||
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
|
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
|
||||||
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
|
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
|
||||||
|
|
Loading…
Reference in New Issue