Merge pull request #26842 from taosdata/enh/TD-30987-27
enh: refactor return code
This commit is contained in:
commit
0e0785bba3
|
@ -36,7 +36,7 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
|
||||||
streamTaskOpenAllUpstreamInput(pTask);
|
streamTaskOpenAllUpstreamInput(pTask);
|
||||||
|
|
||||||
streamTaskResetUpstreamStageInfo(pTask);
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
(void)streamSetupScheduleTrigger(pTask);
|
||||||
|
|
||||||
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
||||||
tqSetRestoreVersionInfo(pTask);
|
tqSetRestoreVersionInfo(pTask);
|
||||||
|
@ -91,14 +91,14 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndInit(SSnode *pSnode) {
|
int32_t sndInit(SSnode *pSnode) {
|
||||||
streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
|
(void)streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sndClose(SSnode *pSnode) {
|
void sndClose(SSnode *pSnode) {
|
||||||
stopRsync();
|
stopRsync();
|
||||||
streamMetaNotifyClose(pSnode->pMeta);
|
streamMetaNotifyClose(pSnode->pMeta);
|
||||||
streamMetaCommit(pSnode->pMeta);
|
(void)streamMetaCommit(pSnode->pMeta);
|
||||||
streamMetaClose(pSnode->pMeta);
|
streamMetaClose(pSnode->pMeta);
|
||||||
taosMemoryFree(pSnode);
|
taosMemoryFree(pSnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -171,12 +171,12 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
|
||||||
async->numTasks--;
|
async->numTasks--;
|
||||||
|
|
||||||
if (task->numWait == 0) {
|
if (task->numWait == 0) {
|
||||||
taosThreadCondDestroy(&task->waitCond);
|
(void)taosThreadCondDestroy(&task->waitCond);
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
} else if (task->numWait == 1) {
|
} else if (task->numWait == 1) {
|
||||||
taosThreadCondSignal(&task->waitCond);
|
(void)taosThreadCondSignal(&task->waitCond);
|
||||||
} else {
|
} else {
|
||||||
taosThreadCondBroadcast(&task->waitCond);
|
(void)taosThreadCondBroadcast(&task->waitCond);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -195,7 +195,7 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
|
||||||
.arg = task->arg,
|
.arg = task->arg,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
vnodeAsyncTaskDone(async, task);
|
(void)vnodeAsyncTaskDone(async, task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -217,14 +217,14 @@ static void *vnodeAsyncLoop(void *arg) {
|
||||||
|
|
||||||
// finish last running task
|
// finish last running task
|
||||||
if (worker->runningTask != NULL) {
|
if (worker->runningTask != NULL) {
|
||||||
vnodeAsyncTaskDone(async, worker->runningTask);
|
(void)vnodeAsyncTaskDone(async, worker->runningTask);
|
||||||
worker->runningTask = NULL;
|
worker->runningTask = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (async->stop || worker->workerId >= async->numWorkers) {
|
if (async->stop || worker->workerId >= async->numWorkers) {
|
||||||
if (async->stop) { // cancel all tasks
|
if (async->stop) { // cancel all tasks
|
||||||
vnodeAsyncCancelAllTasks(async, cancelArray);
|
(void)vnodeAsyncCancelAllTasks(async, cancelArray);
|
||||||
}
|
}
|
||||||
worker->state = EVA_WORKER_STATE_STOP;
|
worker->state = EVA_WORKER_STATE_STOP;
|
||||||
async->numLaunchWorkers--;
|
async->numLaunchWorkers--;
|
||||||
|
@ -259,7 +259,7 @@ static void *vnodeAsyncLoop(void *arg) {
|
||||||
if (worker->runningTask == NULL) {
|
if (worker->runningTask == NULL) {
|
||||||
worker->state = EVA_WORKER_STATE_IDLE;
|
worker->state = EVA_WORKER_STATE_IDLE;
|
||||||
async->numIdleWorkers++;
|
async->numIdleWorkers++;
|
||||||
taosThreadCondWait(&async->hasTask, &async->mutex);
|
(void)taosThreadCondWait(&async->hasTask, &async->mutex);
|
||||||
async->numIdleWorkers--;
|
async->numIdleWorkers--;
|
||||||
worker->state = EVA_WORKER_STATE_ACTIVE;
|
worker->state = EVA_WORKER_STATE_ACTIVE;
|
||||||
} else {
|
} else {
|
||||||
|
@ -271,7 +271,7 @@ static void *vnodeAsyncLoop(void *arg) {
|
||||||
(void)taosThreadMutexUnlock(&async->mutex);
|
(void)taosThreadMutexUnlock(&async->mutex);
|
||||||
|
|
||||||
// do run the task
|
// do run the task
|
||||||
worker->runningTask->execute(worker->runningTask->arg);
|
(void)worker->runningTask->execute(worker->runningTask->arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -334,8 +334,8 @@ static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
|
||||||
strcpy((char *)((*async) + 1), label);
|
strcpy((char *)((*async) + 1), label);
|
||||||
(*async)->label = (const char *)((*async) + 1);
|
(*async)->label = (const char *)((*async) + 1);
|
||||||
|
|
||||||
taosThreadMutexInit(&(*async)->mutex, NULL);
|
(void)taosThreadMutexInit(&(*async)->mutex, NULL);
|
||||||
taosThreadCondInit(&(*async)->hasTask, NULL);
|
(void)taosThreadCondInit(&(*async)->hasTask, NULL);
|
||||||
(*async)->stop = false;
|
(*async)->stop = false;
|
||||||
|
|
||||||
// worker
|
// worker
|
||||||
|
@ -356,8 +356,8 @@ static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
|
||||||
(*async)->chList.next = &(*async)->chList;
|
(*async)->chList.next = &(*async)->chList;
|
||||||
ret = vHashInit(&(*async)->channelTable, vnodeAsyncChannelHash, vnodeAsyncChannelCompare);
|
ret = vHashInit(&(*async)->channelTable, vnodeAsyncChannelHash, vnodeAsyncChannelCompare);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
taosThreadMutexDestroy(&(*async)->mutex);
|
(void)taosThreadMutexDestroy(&(*async)->mutex);
|
||||||
taosThreadCondDestroy(&(*async)->hasTask);
|
(void)taosThreadCondDestroy(&(*async)->hasTask);
|
||||||
taosMemoryFree(*async);
|
taosMemoryFree(*async);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -371,9 +371,9 @@ static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
|
||||||
}
|
}
|
||||||
ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare);
|
ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
vHashDestroy(&(*async)->channelTable);
|
(void)vHashDestroy(&(*async)->channelTable);
|
||||||
taosThreadMutexDestroy(&(*async)->mutex);
|
(void)taosThreadMutexDestroy(&(*async)->mutex);
|
||||||
taosThreadCondDestroy(&(*async)->hasTask);
|
(void)taosThreadCondDestroy(&(*async)->hasTask);
|
||||||
taosMemoryFree(*async);
|
taosMemoryFree(*async);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -402,7 +402,7 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadJoin((*async)->workers[i].thread, NULL);
|
(void)taosThreadJoin((*async)->workers[i].thread, NULL);
|
||||||
ASSERT((*async)->workers[i].state == EVA_WORKER_STATE_STOP);
|
ASSERT((*async)->workers[i].state == EVA_WORKER_STATE_STOP);
|
||||||
(*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
|
(*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
|
||||||
}
|
}
|
||||||
|
@ -425,11 +425,11 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) {
|
||||||
ASSERT((*async)->numChannels == 0);
|
ASSERT((*async)->numChannels == 0);
|
||||||
ASSERT((*async)->numTasks == 0);
|
ASSERT((*async)->numTasks == 0);
|
||||||
|
|
||||||
taosThreadMutexDestroy(&(*async)->mutex);
|
(void)taosThreadMutexDestroy(&(*async)->mutex);
|
||||||
taosThreadCondDestroy(&(*async)->hasTask);
|
(void)taosThreadCondDestroy(&(*async)->hasTask);
|
||||||
|
|
||||||
vHashDestroy(&(*async)->channelTable);
|
(void)vHashDestroy(&(*async)->channelTable);
|
||||||
vHashDestroy(&(*async)->taskTable);
|
(void)vHashDestroy(&(*async)->taskTable);
|
||||||
taosMemoryFree(*async);
|
taosMemoryFree(*async);
|
||||||
*async = NULL;
|
*async = NULL;
|
||||||
|
|
||||||
|
@ -442,11 +442,11 @@ static int32_t vnodeAsyncLaunchWorker(SVAsync *async) {
|
||||||
if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
|
if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
|
||||||
continue;
|
continue;
|
||||||
} else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
|
} else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
|
||||||
taosThreadJoin(async->workers[i].thread, NULL);
|
(void)taosThreadJoin(async->workers[i].thread, NULL);
|
||||||
async->workers[i].state = EVA_WORKER_STATE_UINIT;
|
async->workers[i].state = EVA_WORKER_STATE_UINIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadCreate(&async->workers[i].thread, NULL, vnodeAsyncLoop, &async->workers[i]);
|
(void)taosThreadCreate(&async->workers[i].thread, NULL, vnodeAsyncLoop, &async->workers[i]);
|
||||||
async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
|
async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
|
||||||
async->numLaunchWorkers++;
|
async->numLaunchWorkers++;
|
||||||
break;
|
break;
|
||||||
|
@ -461,20 +461,20 @@ int32_t vnodeAsyncOpen(int32_t numOfThreads) {
|
||||||
// vnode-commit
|
// vnode-commit
|
||||||
code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit");
|
code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit");
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
vnodeAsyncSetWorkers(1, numOfThreads);
|
(void)vnodeAsyncSetWorkers(1, numOfThreads);
|
||||||
|
|
||||||
// vnode-merge
|
// vnode-merge
|
||||||
code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge");
|
code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge");
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
vnodeAsyncSetWorkers(2, numOfThreads);
|
(void)vnodeAsyncSetWorkers(2, numOfThreads);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeAsyncClose() {
|
int32_t vnodeAsyncClose() {
|
||||||
vnodeAsyncDestroy(&vnodeAsyncs[1]);
|
(void)vnodeAsyncDestroy(&vnodeAsyncs[1]);
|
||||||
vnodeAsyncDestroy(&vnodeAsyncs[2]);
|
(void)vnodeAsyncDestroy(&vnodeAsyncs[2]);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -501,7 +501,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
||||||
task->arg = arg;
|
task->arg = arg;
|
||||||
task->state = EVA_TASK_STATE_WAITTING;
|
task->state = EVA_TASK_STATE_WAITTING;
|
||||||
task->numWait = 0;
|
task->numWait = 0;
|
||||||
taosThreadCondInit(&task->waitCond, NULL);
|
(void)taosThreadCondInit(&task->waitCond, NULL);
|
||||||
|
|
||||||
// schedule task
|
// schedule task
|
||||||
(void)taosThreadMutexLock(&async->mutex);
|
(void)taosThreadMutexLock(&async->mutex);
|
||||||
|
@ -512,10 +512,10 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
||||||
SVAChannel channel = {
|
SVAChannel channel = {
|
||||||
.channelId = channelID->id,
|
.channelId = channelID->id,
|
||||||
};
|
};
|
||||||
vHashGet(async->channelTable, &channel, (void **)&task->channel);
|
(void)vHashGet(async->channelTable, &channel, (void **)&task->channel);
|
||||||
if (task->channel == NULL) {
|
if (task->channel == NULL) {
|
||||||
(void)taosThreadMutexUnlock(&async->mutex);
|
(void)taosThreadMutexUnlock(&async->mutex);
|
||||||
taosThreadCondDestroy(&task->waitCond);
|
(void)taosThreadCondDestroy(&task->waitCond);
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
@ -527,7 +527,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
||||||
int32_t ret = vHashPut(async->taskTable, task);
|
int32_t ret = vHashPut(async->taskTable, task);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
(void)taosThreadMutexUnlock(&async->mutex);
|
(void)taosThreadMutexUnlock(&async->mutex);
|
||||||
taosThreadCondDestroy(&task->waitCond);
|
(void)taosThreadCondDestroy(&task->waitCond);
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -548,9 +548,9 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
||||||
|
|
||||||
// signal worker or launch new worker
|
// signal worker or launch new worker
|
||||||
if (async->numIdleWorkers > 0) {
|
if (async->numIdleWorkers > 0) {
|
||||||
taosThreadCondSignal(&(async->hasTask));
|
(void)taosThreadCondSignal(&(async->hasTask));
|
||||||
} else if (async->numLaunchWorkers < async->numWorkers) {
|
} else if (async->numLaunchWorkers < async->numWorkers) {
|
||||||
vnodeAsyncLaunchWorker(async);
|
(void)vnodeAsyncLaunchWorker(async);
|
||||||
}
|
}
|
||||||
} else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING ||
|
} else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING ||
|
||||||
priority >= VATASK_PIORITY(task->channel->scheduled)) {
|
priority >= VATASK_PIORITY(task->channel->scheduled)) {
|
||||||
|
@ -603,14 +603,14 @@ int32_t vnodeAWait(SVATaskID *taskID) {
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&async->mutex);
|
(void)taosThreadMutexLock(&async->mutex);
|
||||||
|
|
||||||
vHashGet(async->taskTable, &task2, (void **)&task);
|
(void)vHashGet(async->taskTable, &task2, (void **)&task);
|
||||||
if (task) {
|
if (task) {
|
||||||
task->numWait++;
|
task->numWait++;
|
||||||
taosThreadCondWait(&task->waitCond, &async->mutex);
|
(void)taosThreadCondWait(&task->waitCond, &async->mutex);
|
||||||
task->numWait--;
|
task->numWait--;
|
||||||
|
|
||||||
if (task->numWait == 0) {
|
if (task->numWait == 0) {
|
||||||
taosThreadCondDestroy(&task->waitCond);
|
(void)taosThreadCondDestroy(&task->waitCond);
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -636,14 +636,14 @@ int32_t vnodeACancel(SVATaskID *taskID) {
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&async->mutex);
|
(void)taosThreadMutexLock(&async->mutex);
|
||||||
|
|
||||||
vHashGet(async->taskTable, &task2, (void **)&task);
|
(void)vHashGet(async->taskTable, &task2, (void **)&task);
|
||||||
if (task) {
|
if (task) {
|
||||||
if (task->state == EVA_TASK_STATE_WAITTING) {
|
if (task->state == EVA_TASK_STATE_WAITTING) {
|
||||||
cancel = task->cancel;
|
cancel = task->cancel;
|
||||||
arg = task->arg;
|
arg = task->arg;
|
||||||
task->next->prev = task->prev;
|
task->next->prev = task->prev;
|
||||||
task->prev->next = task->next;
|
task->prev->next = task->next;
|
||||||
vnodeAsyncTaskDone(async, task);
|
(void)vnodeAsyncTaskDone(async, task);
|
||||||
} else {
|
} else {
|
||||||
ret = TSDB_CODE_FAILED;
|
ret = TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -736,12 +736,12 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&async->mutex);
|
(void)taosThreadMutexLock(&async->mutex);
|
||||||
|
|
||||||
vHashGet(async->channelTable, &channel2, (void **)&channel);
|
(void)vHashGet(async->channelTable, &channel2, (void **)&channel);
|
||||||
if (channel) {
|
if (channel) {
|
||||||
// unregister channel
|
// unregister channel
|
||||||
channel->next->prev = channel->prev;
|
channel->next->prev = channel->prev;
|
||||||
channel->prev->next = channel->next;
|
channel->prev->next = channel->next;
|
||||||
vHashDrop(async->channelTable, channel);
|
(void)vHashDrop(async->channelTable, channel);
|
||||||
async->numChannels--;
|
async->numChannels--;
|
||||||
|
|
||||||
// cancel all waiting tasks
|
// cancel all waiting tasks
|
||||||
|
@ -756,7 +756,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
||||||
.arg = task->arg,
|
.arg = task->arg,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
vnodeAsyncTaskDone(async, task);
|
(void)vnodeAsyncTaskDone(async, task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -771,7 +771,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
||||||
.arg = channel->scheduled->arg,
|
.arg = channel->scheduled->arg,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
vnodeAsyncTaskDone(async, channel->scheduled);
|
(void)vnodeAsyncTaskDone(async, channel->scheduled);
|
||||||
}
|
}
|
||||||
taosMemoryFree(channel);
|
taosMemoryFree(channel);
|
||||||
} else {
|
} else {
|
||||||
|
@ -779,10 +779,10 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
||||||
// wait task
|
// wait task
|
||||||
SVATask *task = channel->scheduled;
|
SVATask *task = channel->scheduled;
|
||||||
task->numWait++;
|
task->numWait++;
|
||||||
taosThreadCondWait(&task->waitCond, &async->mutex);
|
(void)taosThreadCondWait(&task->waitCond, &async->mutex);
|
||||||
task->numWait--;
|
task->numWait--;
|
||||||
if (task->numWait == 0) {
|
if (task->numWait == 0) {
|
||||||
taosThreadCondDestroy(&task->waitCond);
|
(void)taosThreadCondDestroy(&task->waitCond);
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBu
|
||||||
memset(pPool, 0, sizeof(SVBufPool));
|
memset(pPool, 0, sizeof(SVBufPool));
|
||||||
|
|
||||||
// query handle list
|
// query handle list
|
||||||
taosThreadMutexInit(&pPool->mutex, NULL);
|
(void)taosThreadMutexInit(&pPool->mutex, NULL);
|
||||||
pPool->nQuery = 0;
|
pPool->nQuery = 0;
|
||||||
pPool->qList.pNext = &pPool->qList;
|
pPool->qList.pNext = &pPool->qList;
|
||||||
pPool->qList.ppNext = &pPool->qList.pNext;
|
pPool->qList.ppNext = &pPool->qList.pNext;
|
||||||
|
@ -61,10 +61,10 @@ static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBu
|
||||||
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
||||||
vnodeBufPoolReset(pPool);
|
vnodeBufPoolReset(pPool);
|
||||||
if (pPool->lock) {
|
if (pPool->lock) {
|
||||||
taosThreadSpinDestroy(pPool->lock);
|
(void)taosThreadSpinDestroy(pPool->lock);
|
||||||
taosMemoryFree((void *)pPool->lock);
|
taosMemoryFree((void *)pPool->lock);
|
||||||
}
|
}
|
||||||
taosThreadMutexDestroy(&pPool->mutex);
|
(void)taosThreadMutexDestroy(&pPool->mutex);
|
||||||
taosMemoryFree(pPool);
|
taosMemoryFree(pPool);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) {
|
if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) {
|
||||||
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
vnodeCloseBufPool(pVnode);
|
(void)vnodeCloseBufPool(pVnode);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
|
||||||
int vnodeCloseBufPool(SVnode *pVnode) {
|
int vnodeCloseBufPool(SVnode *pVnode) {
|
||||||
for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
|
for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
|
||||||
if (pVnode->aBufPool[i]) {
|
if (pVnode->aBufPool[i]) {
|
||||||
vnodeBufPoolDestroy(pVnode->aBufPool[i]);
|
(void)vnodeBufPoolDestroy(pVnode->aBufPool[i]);
|
||||||
pVnode->aBufPool[i] = NULL;
|
pVnode->aBufPool[i] = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -141,7 +141,7 @@ void *vnodeBufPoolMallocAligned(SVBufPool *pPool, int size) {
|
||||||
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
|
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
if (pPool->lock) {
|
if (pPool->lock) {
|
||||||
taosThreadSpinUnlock(pPool->lock);
|
(void)taosThreadSpinUnlock(pPool->lock);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -155,7 +155,7 @@ void *vnodeBufPoolMallocAligned(SVBufPool *pPool, int size) {
|
||||||
|
|
||||||
pPool->size = pPool->size + sizeof(*pNode) + size;
|
pPool->size = pPool->size + sizeof(*pNode) + size;
|
||||||
}
|
}
|
||||||
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
||||||
// allocate a new node
|
// allocate a new node
|
||||||
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
|
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +187,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
||||||
|
|
||||||
pPool->size = pPool->size + sizeof(*pNode) + size;
|
pPool->size = pPool->size + sizeof(*pNode) + size;
|
||||||
}
|
}
|
||||||
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,7 +223,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
|
||||||
vInfo("vgId:%d, buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id,
|
vInfo("vgId:%d, buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id,
|
||||||
pPool->node.size, size);
|
pPool->node.size, size);
|
||||||
|
|
||||||
vnodeBufPoolDestroy(pPool);
|
(void)vnodeBufPoolDestroy(pPool);
|
||||||
pPool = pNewPool;
|
pPool = pNewPool;
|
||||||
pVnode->aBufPool[pPool->id] = pPool;
|
pVnode->aBufPool[pPool->id] = pPool;
|
||||||
}
|
}
|
||||||
|
@ -234,7 +234,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
|
||||||
vnodeBufPoolReset(pPool);
|
vnodeBufPoolReset(pPool);
|
||||||
pPool->freeNext = pVnode->freeList;
|
pPool->freeNext = pVnode->freeList;
|
||||||
pVnode->freeList = pPool;
|
pVnode->freeList = pPool;
|
||||||
taosThreadCondSignal(&pVnode->poolNotEmpty);
|
(void)taosThreadCondSignal(&pVnode->poolNotEmpty);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
|
void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
|
||||||
|
|
|
@ -127,7 +127,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||||
if (pNodeRetentions == NULL) {
|
if (pNodeRetentions == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
|
TAOS_CHECK_RETURN(tjsonAddItemToObject(pJson, "retentions", pNodeRetentions));
|
||||||
for (int32_t i = 0; i < nRetention; ++i) {
|
for (int32_t i = 0; i < nRetention; ++i) {
|
||||||
SJson *pNodeRetention = tjsonCreateObject();
|
SJson *pNodeRetention = tjsonCreateObject();
|
||||||
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
|
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
|
||||||
|
@ -353,7 +353,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (info == NULL) return -1;
|
if (info == NULL) return -1;
|
||||||
tjsonGetNumberValue(info, "nodePort", pNode->nodePort, code);
|
tjsonGetNumberValue(info, "nodePort", pNode->nodePort, code);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
tjsonGetStringValue(info, "nodeFqdn", pNode->nodeFqdn);
|
(void)tjsonGetStringValue(info, "nodeFqdn", pNode->nodeFqdn);
|
||||||
tjsonGetNumberValue(info, "nodeId", pNode->nodeId, code);
|
tjsonGetNumberValue(info, "nodeId", pNode->nodeId, code);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
tjsonGetNumberValue(info, "clusterId", pNode->clusterId, code);
|
tjsonGetNumberValue(info, "clusterId", pNode->clusterId, code);
|
||||||
|
|
|
@ -91,7 +91,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
|
||||||
|
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
struct timespec ts;
|
struct timespec ts;
|
||||||
taosGetTimeOfDay(&tv);
|
(void)taosGetTimeOfDay(&tv);
|
||||||
ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
|
ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
|
||||||
if (ts.tv_nsec > 999999999l) {
|
if (ts.tv_nsec > 999999999l) {
|
||||||
ts.tv_sec = tv.tv_sec + 1;
|
ts.tv_sec = tv.tv_sec + 1;
|
||||||
|
@ -199,7 +199,7 @@ _exit:
|
||||||
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
|
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
|
||||||
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
|
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
|
||||||
}
|
}
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,7 @@ _exit:
|
||||||
vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code));
|
vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
taosMemoryFree(pData);
|
taosMemoryFree(pData);
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,7 +270,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
||||||
int64_t lastCommitted = pInfo->info.state.committed;
|
int64_t lastCommitted = pInfo->info.state.committed;
|
||||||
|
|
||||||
// wait last commit task
|
// wait last commit task
|
||||||
vnodeAWait(&pVnode->commitTask);
|
(void)vnodeAWait(&pVnode->commitTask);
|
||||||
|
|
||||||
code = syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg);
|
code = syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -285,7 +285,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
||||||
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
||||||
|
|
||||||
// save info
|
// save info
|
||||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
|
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
|
||||||
code = vnodeSaveInfo(dir, &pInfo->info);
|
code = vnodeSaveInfo(dir, &pInfo->info);
|
||||||
|
@ -395,8 +395,8 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeSyncCommit(SVnode *pVnode) {
|
int vnodeSyncCommit(SVnode *pVnode) {
|
||||||
vnodeAsyncCommit(pVnode);
|
(void)vnodeAsyncCommit(pVnode);
|
||||||
vnodeAWait(&pVnode->commitTask);
|
(void)vnodeAWait(&pVnode->commitTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,9 +416,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
|
(void)syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
|
||||||
|
|
||||||
code = tsdbCommitBegin(pVnode->pTsdb, pInfo);
|
code = tsdbCommitBegin(pVnode->pTsdb, pInfo);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -455,7 +455,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
syncEndSnapshot(pVnode->sync);
|
(void)syncEndSnapshot(pVnode->sync);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -470,7 +470,7 @@ bool vnodeShouldRollback(SVnode *pVnode) {
|
||||||
char tFName[TSDB_FILENAME_LEN] = {0};
|
char tFName[TSDB_FILENAME_LEN] = {0};
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
|
||||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
|
||||||
offset = strlen(tFName);
|
offset = strlen(tFName);
|
||||||
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
|
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
|
||||||
|
|
||||||
|
@ -481,7 +481,7 @@ void vnodeRollback(SVnode *pVnode) {
|
||||||
char tFName[TSDB_FILENAME_LEN] = {0};
|
char tFName[TSDB_FILENAME_LEN] = {0};
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
|
||||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
|
||||||
offset = strlen(tFName);
|
offset = strlen(tFName);
|
||||||
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
|
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
|
||||||
|
|
||||||
|
|
|
@ -98,7 +98,7 @@ int32_t vHashPut(SVHashTable* ht, void* obj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ht->numEntries >= ht->numBuckets) {
|
if (ht->numEntries >= ht->numBuckets) {
|
||||||
vHashRehash(ht, ht->numBuckets * 2);
|
(void)vHashRehash(ht, ht->numBuckets * 2);
|
||||||
bucketIndex = ht->hash(obj) % ht->numBuckets;
|
bucketIndex = ht->hash(obj) % ht->numBuckets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ int32_t vHashDrop(SVHashTable* ht, const void* obj) {
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
ht->numEntries--;
|
ht->numEntries--;
|
||||||
if (ht->numBuckets > VNODE_HASH_DEFAULT_NUM_BUCKETS && ht->numEntries < ht->numBuckets / 4) {
|
if (ht->numBuckets > VNODE_HASH_DEFAULT_NUM_BUCKETS && ht->numEntries < ht->numBuckets / 4) {
|
||||||
vHashRehash(ht, ht->numBuckets / 2);
|
(void)vHashRehash(ht, ht->numBuckets / 2);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ int vnodeInit(int nthreads) {
|
||||||
|
|
||||||
void vnodeCleanup() {
|
void vnodeCleanup() {
|
||||||
if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return;
|
if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return;
|
||||||
vnodeAsyncClose();
|
(void)vnodeAsyncClose();
|
||||||
walCleanUp();
|
walCleanUp();
|
||||||
smaCleanUp();
|
smaCleanUp();
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs
|
||||||
vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(errno), path);
|
vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(errno), path);
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
if (pCfg) {
|
if (pCfg) {
|
||||||
info.config = *pCfg;
|
info.config = *pCfg;
|
||||||
|
@ -88,7 +88,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
ret = vnodeLoadInfo(dir, &info);
|
ret = vnodeLoadInfo(dir, &info);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
@ -221,7 +221,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
ret = vnodeLoadInfo(dir, &info);
|
ret = vnodeLoadInfo(dir, &info);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
@ -283,7 +283,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
if (vnodeLoadInfo(dir, &info) == 0) {
|
if (vnodeLoadInfo(dir, &info) == 0) {
|
||||||
if (info.config.vgId != dstVgId) {
|
if (info.config.vgId != dstVgId) {
|
||||||
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
|
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
|
||||||
|
@ -292,7 +292,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
|
||||||
return dstVgId;
|
return dstVgId;
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
if (vnodeLoadInfo(dir, &info) < 0) {
|
if (vnodeLoadInfo(dir, &info) < 0) {
|
||||||
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
|
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -317,7 +317,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
|
||||||
|
|
||||||
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
|
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
|
||||||
vInfo("path:%s is removed while destroy vnode", path);
|
vInfo("path:%s is removed while destroy vnode", path);
|
||||||
tfsRmdir(pTfs, path);
|
(void)tfsRmdir(pTfs, path);
|
||||||
|
|
||||||
// int32_t nlevel = tfsGetLevel(pTfs);
|
// int32_t nlevel = tfsGetLevel(pTfs);
|
||||||
if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
|
if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
|
||||||
|
@ -351,7 +351,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
|
vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
info.config = vnodeCfgDefault;
|
info.config = vnodeCfgDefault;
|
||||||
|
|
||||||
|
@ -401,12 +401,12 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
pVnode->pTfs = pTfs;
|
pVnode->pTfs = pTfs;
|
||||||
pVnode->diskPrimary = diskPrimary;
|
pVnode->diskPrimary = diskPrimary;
|
||||||
pVnode->msgCb = msgCb;
|
pVnode->msgCb = msgCb;
|
||||||
taosThreadMutexInit(&pVnode->lock, NULL);
|
(void)taosThreadMutexInit(&pVnode->lock, NULL);
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
|
|
||||||
tsem_init(&pVnode->syncSem, 0, 0);
|
tsem_init(&pVnode->syncSem, 0, 0);
|
||||||
taosThreadMutexInit(&pVnode->mutex, NULL);
|
(void)taosThreadMutexInit(&pVnode->mutex, NULL);
|
||||||
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
|
(void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
|
||||||
|
|
||||||
if (vnodeAChannelInit(1, &pVnode->commitChannel) != 0) {
|
if (vnodeAChannelInit(1, &pVnode->commitChannel) != 0) {
|
||||||
vError("vgId:%d, failed to init commit channel", TD_VID(pVnode));
|
vError("vgId:%d, failed to init commit channel", TD_VID(pVnode));
|
||||||
|
@ -439,7 +439,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
|
|
||||||
// open wal
|
// open wal
|
||||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
|
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
|
||||||
taosRealPath(tdir, NULL, sizeof(tdir));
|
(void)taosRealPath(tdir, NULL, sizeof(tdir));
|
||||||
|
|
||||||
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
|
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
|
||||||
if (pVnode->pWal == NULL) {
|
if (pVnode->pWal == NULL) {
|
||||||
|
@ -449,7 +449,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
|
|
||||||
// open tq
|
// open tq
|
||||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
|
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
|
||||||
taosRealPath(tdir, NULL, sizeof(tdir));
|
(void)taosRealPath(tdir, NULL, sizeof(tdir));
|
||||||
|
|
||||||
// open query
|
// open query
|
||||||
if (vnodeQueryOpen(pVnode)) {
|
if (vnodeQueryOpen(pVnode)) {
|
||||||
|
@ -502,7 +502,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", label_count, sample_labels);
|
counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", label_count, sample_labels);
|
||||||
vInfo("vgId:%d, new metric:%p", TD_VID(pVnode), counter);
|
vInfo("vgId:%d, new metric:%p", TD_VID(pVnode), counter);
|
||||||
if (taos_collector_registry_register_metric(counter) == 1) {
|
if (taos_collector_registry_register_metric(counter) == 1) {
|
||||||
taos_counter_destroy(counter);
|
(void)taos_counter_destroy(counter);
|
||||||
counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT);
|
counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT);
|
||||||
vInfo("vgId:%d, get metric from registry:%p", TD_VID(pVnode), counter);
|
vInfo("vgId:%d, get metric from registry:%p", TD_VID(pVnode), counter);
|
||||||
}
|
}
|
||||||
|
@ -516,10 +516,10 @@ _err:
|
||||||
if (pVnode->pQuery) vnodeQueryClose(pVnode);
|
if (pVnode->pQuery) vnodeQueryClose(pVnode);
|
||||||
if (pVnode->pTq) tqClose(pVnode->pTq);
|
if (pVnode->pTq) tqClose(pVnode->pTq);
|
||||||
if (pVnode->pWal) walClose(pVnode->pWal);
|
if (pVnode->pWal) walClose(pVnode->pWal);
|
||||||
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
if (pVnode->pTsdb) (void)tsdbClose(&pVnode->pTsdb);
|
||||||
if (pVnode->pSma) smaClose(pVnode->pSma);
|
if (pVnode->pSma) (void)smaClose(pVnode->pSma);
|
||||||
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
|
if (pVnode->pMeta) (void)metaClose(&pVnode->pMeta);
|
||||||
if (pVnode->freeList) vnodeCloseBufPool(pVnode);
|
if (pVnode->freeList) (void)vnodeCloseBufPool(pVnode);
|
||||||
|
|
||||||
taosMemoryFree(pVnode);
|
taosMemoryFree(pVnode);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -534,22 +534,22 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
|
||||||
|
|
||||||
void vnodeClose(SVnode *pVnode) {
|
void vnodeClose(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
vnodeAWait(&pVnode->commitTask);
|
(void)vnodeAWait(&pVnode->commitTask);
|
||||||
vnodeAChannelDestroy(&pVnode->commitChannel, true);
|
(void)vnodeAChannelDestroy(&pVnode->commitChannel, true);
|
||||||
vnodeSyncClose(pVnode);
|
vnodeSyncClose(pVnode);
|
||||||
vnodeQueryClose(pVnode);
|
vnodeQueryClose(pVnode);
|
||||||
tqClose(pVnode->pTq);
|
tqClose(pVnode->pTq);
|
||||||
walClose(pVnode->pWal);
|
walClose(pVnode->pWal);
|
||||||
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
||||||
smaClose(pVnode->pSma);
|
(void)smaClose(pVnode->pSma);
|
||||||
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
|
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
|
||||||
vnodeCloseBufPool(pVnode);
|
(void)vnodeCloseBufPool(pVnode);
|
||||||
|
|
||||||
// destroy handle
|
// destroy handle
|
||||||
tsem_destroy(&pVnode->syncSem);
|
tsem_destroy(&pVnode->syncSem);
|
||||||
taosThreadCondDestroy(&pVnode->poolNotEmpty);
|
taosThreadCondDestroy(&pVnode->poolNotEmpty);
|
||||||
taosThreadMutexDestroy(&pVnode->mutex);
|
(void)taosThreadMutexDestroy(&pVnode->mutex);
|
||||||
taosThreadMutexDestroy(&pVnode->lock);
|
(void)taosThreadMutexDestroy(&pVnode->lock);
|
||||||
taosMemoryFree(pVnode);
|
taosMemoryFree(pVnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -209,7 +209,7 @@ static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) {
|
||||||
for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
|
for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
|
||||||
TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
|
TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
|
||||||
if (ppRanges == NULL) continue;
|
if (ppRanges == NULL) continue;
|
||||||
tsdbTFileSetRangeArrayDestroy(ppRanges);
|
(void)tsdbTFileSetRangeArrayDestroy(ppRanges);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,15 +218,15 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
|
||||||
vnodeSnapReaderDestroyTsdbRanges(pReader);
|
vnodeSnapReaderDestroyTsdbRanges(pReader);
|
||||||
|
|
||||||
if (pReader->pRsmaReader) {
|
if (pReader->pRsmaReader) {
|
||||||
rsmaSnapReaderClose(&pReader->pRsmaReader);
|
(void)rsmaSnapReaderClose(&pReader->pRsmaReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pTsdbReader) {
|
if (pReader->pTsdbReader) {
|
||||||
tsdbSnapReaderClose(&pReader->pTsdbReader);
|
(void)tsdbSnapReaderClose(&pReader->pTsdbReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pTsdbRAWReader) {
|
if (pReader->pTsdbRAWReader) {
|
||||||
tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
|
(void)tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pMetaReader) {
|
if (pReader->pMetaReader) {
|
||||||
|
@ -260,7 +260,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
char fName[TSDB_FILENAME_LEN];
|
char fName[TSDB_FILENAME_LEN];
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
|
||||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fName, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fName, TSDB_FILENAME_LEN);
|
||||||
offset = strlen(fName);
|
offset = strlen(fName);
|
||||||
snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
|
snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
|
||||||
|
|
||||||
|
@ -272,13 +272,13 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
|
|
||||||
int64_t size;
|
int64_t size;
|
||||||
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
|
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
|
||||||
if (*ppData == NULL) {
|
if (*ppData == NULL) {
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
}
|
}
|
||||||
((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
|
((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
|
||||||
|
@ -287,11 +287,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
|
|
||||||
if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
|
if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
|
||||||
taosMemoryFree(*ppData);
|
taosMemoryFree(*ppData);
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
|
|
||||||
pReader->cfgDone = 1;
|
pReader->cfgDone = 1;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -590,15 +590,15 @@ extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
|
||||||
extern int32_t tsdbEnableBgTask(STsdb *pTsdb);
|
extern int32_t tsdbEnableBgTask(STsdb *pTsdb);
|
||||||
|
|
||||||
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
|
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
|
||||||
tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
(void)tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
||||||
vnodeSyncCommit(pVnode);
|
(void)vnodeSyncCommit(pVnode);
|
||||||
vnodeAChannelDestroy(&pVnode->commitChannel, true);
|
(void)vnodeAChannelDestroy(&pVnode->commitChannel, true);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
|
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
|
||||||
tsdbEnableBgTask(pVnode->pTsdb);
|
(void)tsdbEnableBgTask(pVnode->pTsdb);
|
||||||
vnodeAChannelInit(1, &pVnode->commitChannel);
|
(void)vnodeAChannelInit(1, &pVnode->commitChannel);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -646,7 +646,7 @@ static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) {
|
||||||
for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
|
for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
|
||||||
TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
|
TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
|
||||||
if (ppRanges == NULL) continue;
|
if (ppRanges == NULL) continue;
|
||||||
tsdbTFileSetRangeArrayDestroy(ppRanges);
|
(void)tsdbTFileSetRangeArrayDestroy(ppRanges);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -658,15 +658,15 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
|
|
||||||
// prepare
|
// prepare
|
||||||
if (pWriter->pTsdbSnapWriter) {
|
if (pWriter->pTsdbSnapWriter) {
|
||||||
tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter);
|
(void)tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWriter->pTsdbSnapRAWWriter) {
|
if (pWriter->pTsdbSnapRAWWriter) {
|
||||||
tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter);
|
(void)tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWriter->pRsmaSnapWriter) {
|
if (pWriter->pRsmaSnapWriter) {
|
||||||
rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter);
|
(void)rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter);
|
||||||
}
|
}
|
||||||
|
|
||||||
// commit json
|
// commit json
|
||||||
|
@ -681,7 +681,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
.applyTerm = pWriter->info.state.commitTerm};
|
.applyTerm = pWriter->info.state.commitTerm};
|
||||||
pVnode->statis = pWriter->info.statis;
|
pVnode->statis = pWriter->info.statis;
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
code = vnodeCommitInfo(dir);
|
code = vnodeCommitInfo(dir);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
@ -740,7 +740,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeBegin(pVnode);
|
(void)vnodeBegin(pVnode);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -768,7 +768,7 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
|
||||||
|
|
||||||
// modify info as needed
|
// modify info as needed
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
(void)vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
SVnodeStats vndStats = pWriter->info.config.vndStats;
|
SVnodeStats vndStats = pWriter->info.config.vndStats;
|
||||||
pWriter->info.config = pVnode->config;
|
pWriter->info.config = pVnode->config;
|
||||||
|
|
|
@ -207,7 +207,7 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq);
|
(void)tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq);
|
||||||
pContNew->contLen = htonl(reqLenNew);
|
pContNew->contLen = htonl(reqLenNew);
|
||||||
pContNew->vgId = pContOld->vgId;
|
pContNew->vgId = pContOld->vgId;
|
||||||
|
|
||||||
|
@ -422,7 +422,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
|
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
|
||||||
tEncodeDeleteRes(pCoder, &res);
|
(void)tEncodeDeleteRes(pCoder, &res);
|
||||||
tEncoderClear(pCoder);
|
tEncoderClear(pCoder);
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
@ -632,7 +632,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_CONSEN_CHKPT: {
|
case TDMT_STREAM_CONSEN_CHKPT: {
|
||||||
if (pVnode->restored) {
|
if (pVnode->restored) {
|
||||||
tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg);
|
(void)tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg);
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_PAUSE: {
|
case TDMT_STREAM_TASK_PAUSE: {
|
||||||
|
@ -649,7 +649,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_STREAM_TASK_RESET: {
|
case TDMT_VND_STREAM_TASK_RESET: {
|
||||||
if (pVnode->restored && vnodeIsLeader(pVnode)) {
|
if (pVnode->restored && vnodeIsLeader(pVnode)) {
|
||||||
tqProcessTaskResetReq(pVnode->pTq, pMsg);
|
(void)tqProcessTaskResetReq(pVnode->pTq, pMsg);
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
|
@ -871,7 +871,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
(void)tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
||||||
|
@ -942,7 +942,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
if (ttlReq.nUids > 0) {
|
if (ttlReq.nUids > 0) {
|
||||||
metaDropTables(pVnode->pMeta, ttlReq.pTbUids);
|
metaDropTables(pVnode->pMeta, ttlReq.pTbUids);
|
||||||
tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
|
(void)tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
@ -1142,7 +1142,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cRsp.code = TSDB_CODE_SUCCESS;
|
cRsp.code = TSDB_CODE_SUCCESS;
|
||||||
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
|
(void)tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
|
||||||
if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) {
|
if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
rcode = -1;
|
rcode = -1;
|
||||||
|
@ -1159,11 +1159,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
|
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
|
||||||
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
(void)tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
||||||
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
|
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
tdUidStoreFree(pStore);
|
(void)tdUidStoreFree(pStore);
|
||||||
|
|
||||||
// prepare rsp
|
// prepare rsp
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
@ -1175,13 +1175,13 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
|
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
|
||||||
tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
|
(void)tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
|
||||||
|
|
||||||
if (tsEnableAudit && tsEnableAuditCreateTable) {
|
if (tsEnableAudit && tsEnableAuditCreateTable) {
|
||||||
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
|
(void)tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
|
||||||
SStringBuilder sb = {0};
|
SStringBuilder sb = {0};
|
||||||
for (int32_t i = 0; i < tbNames->size; i++) {
|
for (int32_t i = 0; i < tbNames->size; i++) {
|
||||||
|
@ -1329,7 +1329,7 @@ _exit:
|
||||||
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
|
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
|
||||||
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
||||||
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
||||||
tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
|
(void)tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
|
||||||
tEncoderClear(&ec);
|
tEncoderClear(&ec);
|
||||||
if (vMetaRsp.pSchemas) {
|
if (vMetaRsp.pSchemas) {
|
||||||
taosMemoryFree(vMetaRsp.pSchemas);
|
taosMemoryFree(vMetaRsp.pSchemas);
|
||||||
|
@ -1384,7 +1384,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
||||||
if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
|
if (tbUid > 0) (void)tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) {
|
if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) {
|
||||||
|
@ -1404,14 +1404,14 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
|
(void)tqUpdateTbUidList(pVnode->pTq, tbUids, false);
|
||||||
tdUpdateTbUidList(pVnode->pSma, pStore, false);
|
tdUpdateTbUidList(pVnode->pSma, pStore, false);
|
||||||
|
|
||||||
if (tsEnableAuditCreateTable) {
|
if (tsEnableAuditCreateTable) {
|
||||||
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
|
(void)tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
|
||||||
SStringBuilder sb = {0};
|
SStringBuilder sb = {0};
|
||||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||||
|
@ -1435,7 +1435,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosArrayDestroy(tbUids);
|
taosArrayDestroy(tbUids);
|
||||||
tdUidStoreFree(pStore);
|
(void)tdUidStoreFree(pStore);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
|
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
|
||||||
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
||||||
|
@ -1876,7 +1876,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
if (taosArrayGetSize(newTbUids) > 0) {
|
if (taosArrayGetSize(newTbUids) > 0) {
|
||||||
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
||||||
(int32_t)taosArrayGetSize(newTbUids));
|
(int32_t)taosArrayGetSize(newTbUids));
|
||||||
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
(void)tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -1885,13 +1885,13 @@ _exit:
|
||||||
tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
|
tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
|
||||||
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
||||||
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
||||||
tEncodeSSubmitRsp2(&ec, pSubmitRsp);
|
(void)tEncodeSSubmitRsp2(&ec, pSubmitRsp);
|
||||||
tEncoderClear(&ec);
|
tEncoderClear(&ec);
|
||||||
|
|
||||||
// update statistics
|
// update statistics
|
||||||
atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
|
(void)atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
|
||||||
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
|
(void)atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
|
||||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
(void)atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
||||||
|
|
||||||
if (tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0) {
|
if (tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0) {
|
||||||
const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS,
|
const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS,
|
||||||
|
@ -1901,11 +1901,11 @@ _exit:
|
||||||
pVnode->monitor.strVgId,
|
pVnode->monitor.strVgId,
|
||||||
pOriginalMsg->info.conn.user,
|
pOriginalMsg->info.conn.user,
|
||||||
"Success"};
|
"Success"};
|
||||||
taos_counter_add(pVnode->monitor.insertCounter, pSubmitRsp->affectedRows, sample_labels);
|
(void)taos_counter_add(pVnode->monitor.insertCounter, pSubmitRsp->affectedRows, sample_labels);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
|
(void)atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
|
||||||
code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len);
|
code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len);
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
|
@ -2125,10 +2125,10 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
|
||||||
if (req.sttTrigger > 1 && pVnode->config.sttTrigger > 1) {
|
if (req.sttTrigger > 1 && pVnode->config.sttTrigger > 1) {
|
||||||
pVnode->config.sttTrigger = req.sttTrigger;
|
pVnode->config.sttTrigger = req.sttTrigger;
|
||||||
} else {
|
} else {
|
||||||
vnodeAWait(&pVnode->commitTask);
|
(void)vnodeAWait(&pVnode->commitTask);
|
||||||
tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
(void)tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
||||||
pVnode->config.sttTrigger = req.sttTrigger;
|
pVnode->config.sttTrigger = req.sttTrigger;
|
||||||
tsdbEnableBgTask(pVnode->pTsdb);
|
(void)tsdbEnableBgTask(pVnode->pTsdb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2144,11 +2144,11 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walChanged) {
|
if (walChanged) {
|
||||||
walAlter(pVnode->pWal, &pVnode->config.walCfg);
|
(void)walAlter(pVnode->pWal, &pVnode->config.walCfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbChanged) {
|
if (tsdbChanged) {
|
||||||
tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
|
(void)tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2220,7 +2220,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderInit(pCoder, pReq, len);
|
tDecoderInit(pCoder, pReq, len);
|
||||||
tDecodeDeleteRes(pCoder, pRes);
|
code = tDecodeDeleteRes(pCoder, pRes);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
if (pRes->affectedRows > 0) {
|
if (pRes->affectedRows > 0) {
|
||||||
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
|
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
|
||||||
|
@ -2243,7 +2244,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
||||||
SEncoder ec = {0};
|
SEncoder ec = {0};
|
||||||
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
||||||
tEncodeSVDeleteRsp(&ec, &rsp);
|
code = tEncodeSVDeleteRsp(&ec, &rsp);
|
||||||
|
if (code) goto _err;
|
||||||
tEncoderClear(&ec);
|
tEncoderClear(&ec);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
@ -2325,7 +2327,7 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
syncCheckMember(pVnode->sync);
|
(void)syncCheckMember(pVnode->sync);
|
||||||
|
|
||||||
pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP;
|
pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP;
|
||||||
pRsp->code = TSDB_CODE_SUCCESS;
|
pRsp->code = TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -28,7 +28,7 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
|
vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
|
||||||
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
|
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
|
||||||
tsem_wait(&pVnode->syncSem);
|
(void)tsem_wait(&pVnode->syncSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||||
|
@ -41,7 +41,7 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
pVnode->blockSec = 0;
|
pVnode->blockSec = 0;
|
||||||
pVnode->blockSeq = 0;
|
pVnode->blockSeq = 0;
|
||||||
tsem_post(&pVnode->syncSem);
|
(void)tsem_post(&pVnode->syncSem);
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,7 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
|
||||||
if (rsp.pCont == NULL) {
|
if (rsp.pCont == NULL) {
|
||||||
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
} else {
|
} else {
|
||||||
tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
|
(void)tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
|
||||||
rsp.contLen = contLen;
|
rsp.contLen = contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,7 +161,7 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
rpcMsg.pCont = NULL;
|
rpcMsg.pCont = NULL;
|
||||||
} else {
|
} else {
|
||||||
tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
(void)tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -556,7 +556,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
} while (true);
|
} while (true);
|
||||||
|
|
||||||
ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
|
ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
|
||||||
walApplyVer(pVnode->pWal, commitIdx);
|
(void)walApplyVer(pVnode->pWal, commitIdx);
|
||||||
pVnode->restored = true;
|
pVnode->restored = true;
|
||||||
|
|
||||||
SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
|
SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
|
||||||
|
@ -602,13 +602,13 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
if (pVnode->blocked) {
|
if (pVnode->blocked) {
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
|
vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
|
||||||
tsem_post(&pVnode->syncSem);
|
(void)tsem_post(&pVnode->syncSem);
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||||
|
|
||||||
if (pVnode->pTq) {
|
if (pVnode->pTq) {
|
||||||
tqUpdateNodeStage(pVnode->pTq, false);
|
tqUpdateNodeStage(pVnode->pTq, false);
|
||||||
tqStopStreamTasksAsync(pVnode->pTq);
|
(void)tqStopStreamTasksAsync(pVnode->pTq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -620,7 +620,7 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
if (pVnode->blocked) {
|
if (pVnode->blocked) {
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
|
vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
|
||||||
tsem_post(&pVnode->syncSem);
|
(void)tsem_post(&pVnode->syncSem);
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||||
}
|
}
|
||||||
|
@ -743,14 +743,14 @@ int32_t vnodeSyncStart(SVnode *pVnode) {
|
||||||
|
|
||||||
void vnodeSyncPreClose(SVnode *pVnode) {
|
void vnodeSyncPreClose(SVnode *pVnode) {
|
||||||
vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
|
vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
|
||||||
syncLeaderTransfer(pVnode->sync);
|
(void)syncLeaderTransfer(pVnode->sync);
|
||||||
syncPreStop(pVnode->sync);
|
syncPreStop(pVnode->sync);
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&pVnode->lock);
|
(void)taosThreadMutexLock(&pVnode->lock);
|
||||||
if (pVnode->blocked) {
|
if (pVnode->blocked) {
|
||||||
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
|
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
tsem_post(&pVnode->syncSem);
|
(void)tsem_post(&pVnode->syncSem);
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||||
}
|
}
|
||||||
|
@ -785,7 +785,7 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) {
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
pVnode->blockSec = 0;
|
pVnode->blockSec = 0;
|
||||||
pVnode->blockSeq = 0;
|
pVnode->blockSeq = 0;
|
||||||
tsem_post(&pVnode->syncSem);
|
(void)tsem_post(&pVnode->syncSem);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||||
|
|
Loading…
Reference in New Issue