enh: error code handle
This commit is contained in:
parent
37491186ee
commit
f18c1d8710
|
@ -56,12 +56,12 @@ typedef enum {
|
|||
} EVAPriority;
|
||||
|
||||
int32_t vnodeAsyncOpen(int32_t numOfThreads);
|
||||
int32_t vnodeAsyncClose();
|
||||
void vnodeAsyncClose();
|
||||
int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID);
|
||||
int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning);
|
||||
int32_t vnodeAsync(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*),
|
||||
void* arg, SVATaskID* taskID);
|
||||
int32_t vnodeAWait(SVATaskID* taskID);
|
||||
void vnodeAWait(SVATaskID* taskID);
|
||||
int32_t vnodeACancel(SVATaskID* taskID);
|
||||
int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers);
|
||||
|
||||
|
@ -95,7 +95,7 @@ struct SVBufPool {
|
|||
};
|
||||
|
||||
int32_t vnodeOpenBufPool(SVnode* pVnode);
|
||||
int32_t vnodeCloseBufPool(SVnode* pVnode);
|
||||
void vnodeCloseBufPool(SVnode* pVnode);
|
||||
void vnodeBufPoolReset(SVBufPool* pPool);
|
||||
void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
|
||||
int32_t vnodeBufPoolRecycle(SVBufPool* pPool);
|
||||
|
|
|
@ -593,7 +593,7 @@ struct SVHashTable {
|
|||
|
||||
#define vHashNumEntries(ht) ((ht)->numEntries)
|
||||
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*));
|
||||
int32_t vHashDestroy(SVHashTable** ht);
|
||||
void vHashDestroy(SVHashTable** ht);
|
||||
int32_t vHashPut(SVHashTable* ht, void* obj);
|
||||
int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj);
|
||||
int32_t vHashDrop(SVHashTable* ht, const void* obj);
|
||||
|
|
|
@ -2153,7 +2153,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
|
|||
uint64_t uid = uidList[j];
|
||||
STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
|
||||
if (!pInfo) {
|
||||
(void)tTombBlockDestroy(&block);
|
||||
tTombBlockDestroy(&block);
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -2225,7 +2225,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
|
|||
}
|
||||
}
|
||||
|
||||
(void)tTombBlockDestroy(&block);
|
||||
tTombBlockDestroy(&block);
|
||||
|
||||
if (finished) {
|
||||
TAOS_RETURN(code);
|
||||
|
|
|
@ -418,7 +418,7 @@ static int32_t tsdbCommitInfoDestroy(STsdb *pTsdb) {
|
|||
taosMemoryFree(info);
|
||||
}
|
||||
|
||||
TAOS_UNUSED(vHashDestroy(&pTsdb->commitInfo->ht));
|
||||
vHashDestroy(&pTsdb->commitInfo->ht);
|
||||
taosArrayDestroy(pTsdb->commitInfo->arr);
|
||||
pTsdb->commitInfo->arr = NULL;
|
||||
taosMemoryFreeClear(pTsdb->commitInfo);
|
||||
|
|
|
@ -252,7 +252,7 @@ static int32_t apply_commit(STFileSystem *fs) {
|
|||
if (fset1 && fset2) {
|
||||
if (fset1->fid < fset2->fid) {
|
||||
// delete fset1
|
||||
(void)tsdbTFileSetRemove(fset1);
|
||||
tsdbTFileSetRemove(fset1);
|
||||
i1++;
|
||||
} else if (fset1->fid > fset2->fid) {
|
||||
// create new file set with fid of fset2->fid
|
||||
|
@ -271,7 +271,7 @@ static int32_t apply_commit(STFileSystem *fs) {
|
|||
}
|
||||
} else if (fset1) {
|
||||
// delete fset1
|
||||
(void)tsdbTFileSetRemove(fset1);
|
||||
tsdbTFileSetRemove(fset1);
|
||||
i1++;
|
||||
} else {
|
||||
// create new file set with fid of fset2->fid
|
||||
|
@ -794,11 +794,10 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbEnableBgTask(STsdb *pTsdb) {
|
||||
void tsdbEnableBgTask(STsdb *pTsdb) {
|
||||
(void)taosThreadMutexLock(&pTsdb->mutex);
|
||||
pTsdb->bgTaskDisabled = false;
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbCloseFS(STFileSystem **fs) {
|
||||
|
|
|
@ -627,8 +627,8 @@ void tsdbTFileSetClear(STFileSet **fset) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tsdbTFileSetRemove(STFileSet *fset) {
|
||||
if (fset == NULL) return 0;
|
||||
void tsdbTFileSetRemove(STFileSet *fset) {
|
||||
if (fset == NULL) return;
|
||||
|
||||
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
if (fset->farr[ftype] != NULL) {
|
||||
|
@ -638,8 +638,6 @@ int32_t tsdbTFileSetRemove(STFileSet *fset) {
|
|||
}
|
||||
|
||||
TARRAY2_DESTROY(fset->lvlArr, tsdbSttLvlRemove);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSttLvl *tsdbTFileSetGetSttLvl(STFileSet *fset, int32_t level) {
|
||||
|
|
|
@ -43,7 +43,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
|
|||
int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
|
||||
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
|
||||
void tsdbTFileSetClear(STFileSet **fset);
|
||||
int32_t tsdbTFileSetRemove(STFileSet *fset);
|
||||
void tsdbTFileSetRemove(STFileSet *fset);
|
||||
|
||||
int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset,
|
||||
TFileOpArray *fopArr);
|
||||
|
|
|
@ -204,7 +204,7 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
|
|||
TAOS_CHECK_GOTO(tsdbSttFileReaderOpen(fobj->fname, &config, &reader), &lino, _exit);
|
||||
|
||||
if ((code = TARRAY2_APPEND(merger->sttReaderArr, reader))) {
|
||||
(void)tsdbSttFileReaderClose(&reader);
|
||||
tsdbSttFileReaderClose(&reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -681,11 +681,7 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
|
|||
}
|
||||
|
||||
void tLDataIterClose2(SLDataIter *pIter) {
|
||||
int32_t code = tsdbSttFileReaderClose(&pIter->pReader); // always return 0
|
||||
if (code != 0) {
|
||||
tsdbError("%" PRId64 " failed to close tsdb file reader, code:%s", pIter->cid, tstrerror(code));
|
||||
}
|
||||
|
||||
tsdbSttFileReaderClose(&pIter->pReader);
|
||||
pIter->pReader = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -355,7 +355,11 @@ static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) {
|
|||
terrno = code;
|
||||
break;
|
||||
}
|
||||
(void)TARRAY2_SORT_INSERT(pList, pItem, tsdbFSetPartCmprFn);
|
||||
code = TARRAY2_SORT_INSERT(pList, pItem, tsdbFSetPartCmprFn);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
break;
|
||||
}
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
|
||||
|
|
|
@ -104,7 +104,7 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
|
||||
TAOS_UNUSED(tsdbSttFileReaderClose(&sttReader));
|
||||
tsdbSttFileReaderClose(&sttReader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
@ -449,7 +449,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
|
|||
|
||||
STsdb* tsdb = reader[0]->tsdb;
|
||||
|
||||
TAOS_UNUSED(tTombBlockDestroy(reader[0]->tombBlock));
|
||||
tTombBlockDestroy(reader[0]->tombBlock);
|
||||
tBlockDataDestroy(reader[0]->blockData);
|
||||
|
||||
tsdbIterMergerClose(&reader[0]->dataIterMerger);
|
||||
|
|
|
@ -93,12 +93,12 @@ _exit:
|
|||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(config->tsdb->pVnode), __func__, __FILE__, lino,
|
||||
tstrerror(code));
|
||||
(void)tsdbSttFileReaderClose(reader);
|
||||
tsdbSttFileReaderClose(reader);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
|
||||
void tsdbSttFileReaderClose(SSttFileReader **reader) {
|
||||
if (reader[0]) {
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
|
||||
tBufferDestroy(reader[0]->local + i);
|
||||
|
@ -110,7 +110,6 @@ int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
|
|||
taosMemoryFree(reader[0]);
|
||||
reader[0] = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// SSttFSegReader
|
||||
|
|
|
@ -40,7 +40,7 @@ typedef TARRAY2(SSttFileReader *) TSttFileReaderArray;
|
|||
|
||||
// SSttFileReader
|
||||
int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader);
|
||||
int32_t tsdbSttFileReaderClose(SSttFileReader **reader);
|
||||
void tsdbSttFileReaderClose(SSttFileReader **reader);
|
||||
|
||||
// SSttSegReader
|
||||
int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBlkArray);
|
||||
|
@ -71,10 +71,10 @@ int32_t tsdbSttFileWriteBlockData(SSttFileWriter *writer, SBlockData *pBlockData
|
|||
int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record);
|
||||
bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer);
|
||||
|
||||
int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize,
|
||||
int32_t encryptAlgorithm, char* encryptKey);
|
||||
int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize, int32_t encryptAlgorithm,
|
||||
char* encryptKey);
|
||||
int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize,
|
||||
int32_t encryptAlgorithm, char *encryptKey);
|
||||
int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize, int32_t encryptAlgorithm,
|
||||
char *encryptKey);
|
||||
|
||||
struct SSttFileWriterConfig {
|
||||
STsdb *tsdb;
|
||||
|
|
|
@ -122,7 +122,7 @@ SVAsync *vnodeAsyncs[3];
|
|||
#define MIN_ASYNC_ID 1
|
||||
#define MAX_ASYNC_ID (sizeof(vnodeAsyncs) / sizeof(vnodeAsyncs[0]) - 1)
|
||||
|
||||
static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
|
||||
static void vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
|
||||
int32_t ret;
|
||||
|
||||
if (task->channel != NULL && task->channel->scheduled == task) {
|
||||
|
@ -176,10 +176,9 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
|
|||
} else {
|
||||
(void)taosThreadCondBroadcast(&task->waitCond);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
|
||||
static void vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
|
||||
while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] ||
|
||||
async->queue[2].next != &async->queue[2]) {
|
||||
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
|
||||
|
@ -193,11 +192,10 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
|
|||
.arg = task->arg,
|
||||
}));
|
||||
}
|
||||
(void)vnodeAsyncTaskDone(async, task);
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *vnodeAsyncLoop(void *arg) {
|
||||
|
@ -215,14 +213,14 @@ static void *vnodeAsyncLoop(void *arg) {
|
|||
|
||||
// finish last running task
|
||||
if (worker->runningTask != NULL) {
|
||||
(void)vnodeAsyncTaskDone(async, worker->runningTask);
|
||||
vnodeAsyncTaskDone(async, worker->runningTask);
|
||||
worker->runningTask = NULL;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
if (async->stop || worker->workerId >= async->numWorkers) {
|
||||
if (async->stop) { // cancel all tasks
|
||||
(void)vnodeAsyncCancelAllTasks(async, cancelArray);
|
||||
vnodeAsyncCancelAllTasks(async, cancelArray);
|
||||
}
|
||||
worker->state = EVA_WORKER_STATE_STOP;
|
||||
async->numLaunchWorkers--;
|
||||
|
@ -269,7 +267,8 @@ static void *vnodeAsyncLoop(void *arg) {
|
|||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
// do run the task
|
||||
(void)worker->runningTask->execute(worker->runningTask->arg);
|
||||
int32_t code = worker->runningTask->execute(worker->runningTask->arg);
|
||||
TAOS_UNUSED(code);
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
@ -369,7 +368,7 @@ static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
|
|||
}
|
||||
ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare);
|
||||
if (ret != 0) {
|
||||
(void)vHashDestroy(&(*async)->channelTable);
|
||||
vHashDestroy(&(*async)->channelTable);
|
||||
(void)taosThreadMutexDestroy(&(*async)->mutex);
|
||||
(void)taosThreadCondDestroy(&(*async)->hasTask);
|
||||
taosMemoryFree(*async);
|
||||
|
@ -418,29 +417,32 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) {
|
|||
(void)taosThreadMutexDestroy(&(*async)->mutex);
|
||||
(void)taosThreadCondDestroy(&(*async)->hasTask);
|
||||
|
||||
(void)vHashDestroy(&(*async)->channelTable);
|
||||
(void)vHashDestroy(&(*async)->taskTable);
|
||||
vHashDestroy(&(*async)->channelTable);
|
||||
vHashDestroy(&(*async)->taskTable);
|
||||
taosMemoryFree(*async);
|
||||
*async = NULL;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeAsyncLaunchWorker(SVAsync *async) {
|
||||
static void vnodeAsyncLaunchWorker(SVAsync *async) {
|
||||
for (int32_t i = 0; i < async->numWorkers; i++) {
|
||||
if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
|
||||
continue;
|
||||
} else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
|
||||
(void)taosThreadJoin(async->workers[i].thread, NULL);
|
||||
TAOS_UNUSED(taosThreadJoin(async->workers[i].thread, NULL));
|
||||
async->workers[i].state = EVA_WORKER_STATE_UINIT;
|
||||
}
|
||||
|
||||
(void)taosThreadCreate(&async->workers[i].thread, NULL, vnodeAsyncLoop, &async->workers[i]);
|
||||
async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
|
||||
async->numLaunchWorkers++;
|
||||
int32_t ret = taosThreadCreate(&async->workers[i].thread, NULL, vnodeAsyncLoop, &async->workers[i]);
|
||||
if (ret) {
|
||||
vError("failed to create worker thread since %s", tstrerror(ret));
|
||||
} else {
|
||||
async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
|
||||
async->numLaunchWorkers++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAsyncOpen(int32_t numOfThreads) {
|
||||
|
@ -450,21 +452,25 @@ int32_t vnodeAsyncOpen(int32_t numOfThreads) {
|
|||
// vnode-commit
|
||||
code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit");
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
(void)vnodeAsyncSetWorkers(1, numOfThreads);
|
||||
|
||||
code = vnodeAsyncSetWorkers(1, numOfThreads);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// vnode-merge
|
||||
code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge");
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
(void)vnodeAsyncSetWorkers(2, numOfThreads);
|
||||
|
||||
code = vnodeAsyncSetWorkers(2, numOfThreads);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeAsyncClose() {
|
||||
(void)vnodeAsyncDestroy(&vnodeAsyncs[1]);
|
||||
(void)vnodeAsyncDestroy(&vnodeAsyncs[2]);
|
||||
return 0;
|
||||
void vnodeAsyncClose() {
|
||||
int32_t ret;
|
||||
ret = vnodeAsyncDestroy(&vnodeAsyncs[1]);
|
||||
ret = vnodeAsyncDestroy(&vnodeAsyncs[2]);
|
||||
}
|
||||
|
||||
int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
|
||||
|
@ -474,6 +480,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t ret;
|
||||
int64_t id;
|
||||
SVAsync *async = vnodeAsyncs[channelID->async];
|
||||
|
||||
|
@ -501,7 +508,8 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
SVAChannel channel = {
|
||||
.channelId = channelID->id,
|
||||
};
|
||||
(void)vHashGet(async->channelTable, &channel, (void **)&task->channel);
|
||||
ret = vHashGet(async->channelTable, &channel, (void **)&task->channel);
|
||||
TAOS_UNUSED(ret);
|
||||
if (task->channel == NULL) {
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadCondDestroy(&task->waitCond);
|
||||
|
@ -513,7 +521,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
task->taskId = id = ++async->nextTaskId;
|
||||
|
||||
// add task to hash table
|
||||
int32_t ret = vHashPut(async->taskTable, task);
|
||||
ret = vHashPut(async->taskTable, task);
|
||||
if (ret != 0) {
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadCondDestroy(&task->waitCond);
|
||||
|
@ -539,7 +547,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
if (async->numIdleWorkers > 0) {
|
||||
(void)taosThreadCondSignal(&(async->hasTask));
|
||||
} else if (async->numLaunchWorkers < async->numWorkers) {
|
||||
(void)vnodeAsyncLaunchWorker(async);
|
||||
vnodeAsyncLaunchWorker(async);
|
||||
}
|
||||
} else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING ||
|
||||
priority >= VATASK_PIORITY(task->channel->scheduled)) {
|
||||
|
@ -579,11 +587,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAWait(SVATaskID *taskID) {
|
||||
if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
void vnodeAWait(SVATaskID *taskID) {
|
||||
SVAsync *async = vnodeAsyncs[taskID->async];
|
||||
SVATask *task = NULL;
|
||||
SVATask task2 = {
|
||||
|
@ -592,7 +596,7 @@ int32_t vnodeAWait(SVATaskID *taskID) {
|
|||
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
(void)vHashGet(async->taskTable, &task2, (void **)&task);
|
||||
int32_t ret = vHashGet(async->taskTable, &task2, (void **)&task);
|
||||
if (task) {
|
||||
task->numWait++;
|
||||
(void)taosThreadCondWait(&task->waitCond, &async->mutex);
|
||||
|
@ -605,8 +609,6 @@ int32_t vnodeAWait(SVATaskID *taskID) {
|
|||
}
|
||||
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeACancel(SVATaskID *taskID) {
|
||||
|
@ -625,14 +627,14 @@ int32_t vnodeACancel(SVATaskID *taskID) {
|
|||
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
(void)vHashGet(async->taskTable, &task2, (void **)&task);
|
||||
ret = vHashGet(async->taskTable, &task2, (void **)&task);
|
||||
if (task) {
|
||||
if (task->state == EVA_TASK_STATE_WAITTING) {
|
||||
cancel = task->cancel;
|
||||
arg = task->arg;
|
||||
task->next->prev = task->prev;
|
||||
task->prev->next = task->next;
|
||||
(void)vnodeAsyncTaskDone(async, task);
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
} else {
|
||||
ret = TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -651,6 +653,7 @@ int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
|
|||
if (asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
int32_t ret;
|
||||
SVAsync *async = vnodeAsyncs[asyncID];
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
async->numWorkers = numWorkers;
|
||||
|
@ -725,12 +728,13 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
|||
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
(void)vHashGet(async->channelTable, &channel2, (void **)&channel);
|
||||
int32_t ret = vHashGet(async->channelTable, &channel2, (void **)&channel);
|
||||
TAOS_UNUSED(ret);
|
||||
if (channel) {
|
||||
// unregister channel
|
||||
channel->next->prev = channel->prev;
|
||||
channel->prev->next = channel->next;
|
||||
(void)vHashDrop(async->channelTable, channel);
|
||||
ret = vHashDrop(async->channelTable, channel);
|
||||
async->numChannels--;
|
||||
|
||||
// cancel all waiting tasks
|
||||
|
@ -745,7 +749,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
|||
.arg = task->arg,
|
||||
}));
|
||||
}
|
||||
(void)vnodeAsyncTaskDone(async, task);
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -760,7 +764,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
|||
.arg = channel->scheduled->arg,
|
||||
}));
|
||||
}
|
||||
(void)vnodeAsyncTaskDone(async, channel->scheduled);
|
||||
vnodeAsyncTaskDone(async, channel->scheduled);
|
||||
}
|
||||
taosMemoryFree(channel);
|
||||
} else {
|
||||
|
|
|
@ -58,7 +58,7 @@ static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBu
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
||||
static void vnodeBufPoolDestroy(SVBufPool *pPool) {
|
||||
vnodeBufPoolReset(pPool);
|
||||
if (pPool->lock) {
|
||||
(void)taosThreadSpinDestroy(pPool->lock);
|
||||
|
@ -66,7 +66,6 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
|||
}
|
||||
(void)taosThreadMutexDestroy(&pPool->mutex);
|
||||
taosMemoryFree(pPool);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeOpenBufPool(SVnode *pVnode) {
|
||||
|
@ -77,7 +76,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
|
|||
int32_t code;
|
||||
if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) {
|
||||
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
(void)vnodeCloseBufPool(pVnode);
|
||||
vnodeCloseBufPool(pVnode);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -90,16 +89,15 @@ int vnodeOpenBufPool(SVnode *pVnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int vnodeCloseBufPool(SVnode *pVnode) {
|
||||
void vnodeCloseBufPool(SVnode *pVnode) {
|
||||
for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
|
||||
if (pVnode->aBufPool[i]) {
|
||||
(void)vnodeBufPoolDestroy(pVnode->aBufPool[i]);
|
||||
vnodeBufPoolDestroy(pVnode->aBufPool[i]);
|
||||
pVnode->aBufPool[i] = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeBufPoolReset(SVBufPool *pPool) {
|
||||
|
@ -234,7 +232,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
|
|||
vInfo("vgId:%d, buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id,
|
||||
pPool->node.size, size);
|
||||
|
||||
(void)vnodeBufPoolDestroy(pPool);
|
||||
vnodeBufPoolDestroy(pPool);
|
||||
pPool = pNewPool;
|
||||
pVnode->aBufPool[pPool->id] = pPool;
|
||||
}
|
||||
|
|
|
@ -351,7 +351,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (info == NULL) return -1;
|
||||
tjsonGetNumberValue(info, "nodePort", pNode->nodePort, code);
|
||||
if (code) return code;
|
||||
(void)tjsonGetStringValue(info, "nodeFqdn", pNode->nodeFqdn);
|
||||
code = tjsonGetStringValue(info, "nodeFqdn", pNode->nodeFqdn);
|
||||
tjsonGetNumberValue(info, "nodeId", pNode->nodeId, code);
|
||||
if (code) return code;
|
||||
tjsonGetNumberValue(info, "clusterId", pNode->clusterId, code);
|
||||
|
|
|
@ -272,7 +272,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
|||
int64_t lastCommitted = pInfo->info.state.committed;
|
||||
|
||||
// wait last commit task
|
||||
(void)vnodeAWait(&pVnode->commitTask);
|
||||
vnodeAWait(&pVnode->commitTask);
|
||||
|
||||
code = syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -293,7 +293,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
|||
code = vnodeSaveInfo(dir, &pInfo->info);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
(void)tsdbPreCommit(pVnode->pTsdb);
|
||||
code = tsdbPreCommit(pVnode->pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = metaPrepareAsyncCommit(pVnode->pMeta);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -395,10 +396,15 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int vnodeSyncCommit(SVnode *pVnode) {
|
||||
(void)vnodeAsyncCommit(pVnode);
|
||||
(void)vnodeAWait(&pVnode->commitTask);
|
||||
return 0;
|
||||
int32_t vnodeSyncCommit(SVnode *pVnode) {
|
||||
int32_t lino;
|
||||
int32_t code = vnodeAsyncCommit(pVnode);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
vnodeAWait(&pVnode->commitTask);
|
||||
|
||||
_exit:
|
||||
vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
||||
|
@ -419,7 +425,8 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
|||
|
||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
|
||||
|
||||
(void)syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
|
||||
code = syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbCommitBegin(pVnode->pTsdb, pInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -456,7 +463,8 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
(void)syncEndSnapshot(pVnode->sync);
|
||||
code = syncEndSnapshot(pVnode->sync);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
|
|
@ -71,9 +71,9 @@ int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*com
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vHashDestroy(SVHashTable** ht) {
|
||||
void vHashDestroy(SVHashTable** ht) {
|
||||
if (ht == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
return;
|
||||
}
|
||||
|
||||
if (*ht) {
|
||||
|
@ -81,7 +81,6 @@ int32_t vHashDestroy(SVHashTable** ht) {
|
|||
taosMemoryFree(*ht);
|
||||
(*ht) = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vHashPut(SVHashTable* ht, void* obj) {
|
||||
|
|
|
@ -34,7 +34,7 @@ int vnodeInit(int nthreads, StopDnodeFp stopDnodeFp) {
|
|||
|
||||
void vnodeCleanup() {
|
||||
if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return;
|
||||
(void)vnodeAsyncClose();
|
||||
vnodeAsyncClose();
|
||||
walCleanUp();
|
||||
smaCleanUp();
|
||||
}
|
||||
|
|
|
@ -502,7 +502,7 @@ _err:
|
|||
if (pVnode->pTsdb) (void)tsdbClose(&pVnode->pTsdb);
|
||||
if (pVnode->pSma) (void)smaClose(pVnode->pSma);
|
||||
if (pVnode->pMeta) (void)metaClose(&pVnode->pMeta);
|
||||
if (pVnode->freeList) (void)vnodeCloseBufPool(pVnode);
|
||||
if (pVnode->freeList) vnodeCloseBufPool(pVnode);
|
||||
|
||||
taosMemoryFree(pVnode);
|
||||
return NULL;
|
||||
|
@ -517,7 +517,7 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
|
|||
|
||||
void vnodeClose(SVnode *pVnode) {
|
||||
if (pVnode) {
|
||||
(void)vnodeAWait(&pVnode->commitTask);
|
||||
vnodeAWait(&pVnode->commitTask);
|
||||
(void)vnodeAChannelDestroy(&pVnode->commitChannel, true);
|
||||
vnodeSyncClose(pVnode);
|
||||
vnodeQueryClose(pVnode);
|
||||
|
@ -526,7 +526,7 @@ void vnodeClose(SVnode *pVnode) {
|
|||
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
||||
(void)smaClose(pVnode->pSma);
|
||||
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
|
||||
(void)vnodeCloseBufPool(pVnode);
|
||||
vnodeCloseBufPool(pVnode);
|
||||
|
||||
// destroy handle
|
||||
(void)tsem_destroy(&pVnode->syncSem);
|
||||
|
|
|
@ -586,7 +586,7 @@ _exit:
|
|||
}
|
||||
|
||||
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
|
||||
extern int32_t tsdbEnableBgTask(STsdb *pTsdb);
|
||||
extern void tsdbEnableBgTask(STsdb *pTsdb);
|
||||
|
||||
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
|
||||
(void)tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
||||
|
@ -596,7 +596,7 @@ static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
|
|||
}
|
||||
|
||||
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
|
||||
(void)tsdbEnableBgTask(pVnode->pTsdb);
|
||||
tsdbEnableBgTask(pVnode->pTsdb);
|
||||
(void)vnodeAChannelInit(1, &pVnode->commitChannel);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -2041,7 +2041,7 @@ _exit:
|
|||
}
|
||||
|
||||
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
|
||||
extern int32_t tsdbEnableBgTask(STsdb *pTsdb);
|
||||
extern void tsdbEnableBgTask(STsdb *pTsdb);
|
||||
|
||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
bool walChanged = false;
|
||||
|
@ -2143,10 +2143,10 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
|
|||
if (req.sttTrigger > 1 && pVnode->config.sttTrigger > 1) {
|
||||
pVnode->config.sttTrigger = req.sttTrigger;
|
||||
} else {
|
||||
(void)vnodeAWait(&pVnode->commitTask);
|
||||
vnodeAWait(&pVnode->commitTask);
|
||||
(void)tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
||||
pVnode->config.sttTrigger = req.sttTrigger;
|
||||
(void)tsdbEnableBgTask(pVnode->pTsdb);
|
||||
tsdbEnableBgTask(pVnode->pTsdb);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue