change some apis

This commit is contained in:
Hongze Cheng 2024-12-19 14:51:21 +08:00
parent 1cc8c6d6cc
commit 18fb01264d
10 changed files with 62 additions and 37 deletions

View File

@ -114,6 +114,7 @@ extern int32_t tsRetentionSpeedLimitMB;
extern const char *tsAlterCompactTaskKeywords;
extern int32_t tsNumOfCompactThreads;
extern int32_t tsNumOfRetentionThreads;
// sync raft
extern int32_t tsElectInterval;

View File

@ -14,12 +14,12 @@
*/
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "cJSON.h"
#include "defines.h"
#include "os.h"
#include "osString.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tgrant.h"
#include "tjson.h"
#include "tlog.h"
@ -104,6 +104,7 @@ int32_t tsRetentionSpeedLimitMB = 0; // unlimited
const char *tsAlterCompactTaskKeywords = "max_compact_tasks";
int32_t tsNumOfCompactThreads = 2;
int32_t tsNumOfRetentionThreads = 1;
// sync raft
int32_t tsElectInterval = 25 * 1000;

View File

@ -896,7 +896,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
tmsgReportStartup("vnode-sync", "initialized");
if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) {
if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) {
dError("failed to init vnode since %s", tstrerror(code));
goto _OVER;
}

View File

@ -51,7 +51,7 @@ extern const SVnodeCfg vnodeCfgDefault;
typedef void (*StopDnodeFp)();
int32_t vnodeInit(int32_t nthreads, StopDnodeFp stopDnodeFp);
int32_t vnodeInit(StopDnodeFp stopDnodeFp);
void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
bool vnodeShouldRemoveWal(SVnode *pVnode);

View File

@ -55,12 +55,14 @@ typedef enum {
EVA_PRIORITY_LOW,
} EVAPriority;
int32_t vnodeAsyncOpen(int32_t numOfThreads);
int32_t vnodeAsyncOpen();
void vnodeAsyncClose();
int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID);
int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning);
int32_t vnodeAsync(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*),
void* arg, SVATaskID* taskID);
int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg,
SVATaskID* taskID);
int32_t vnodeAsync2(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*),
void* arg, SVATaskID* taskID);
void vnodeAWait(SVATaskID* taskID);
int32_t vnodeACancel(SVATaskID* taskID);
int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers);

View File

@ -946,7 +946,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
arg->tsdb = fs->tsdb;
arg->fid = fset->fid;
code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, NULL);
code = vnodeAsync2(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
fset->mergeScheduled = true;
}

View File

@ -380,7 +380,7 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate)
arg->fid = fset->fid;
arg->s3Migrate = s3Migrate;
if ((code = vnodeAsync(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) {
if ((code = vnodeAsync2(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) {
taosMemoryFree(arg);
TSDB_CHECK_CODE(code, lino, _exit);
}

View File

@ -118,9 +118,19 @@ struct SVAsync {
SVHashTable *taskTable;
};
SVAsync *vnodeAsyncs[3];
struct {
const char *label;
SVAsync *async;
} GVnodeAsyncs[] = {
[0] = {},
[1] = {"vnode-commit", NULL},
[2] = {"vnode-merge", NULL},
[3] = {"vnode-compact", NULL},
[4] = {"vnode-retention", NULL},
};
#define MIN_ASYNC_ID 1
#define MAX_ASYNC_ID (sizeof(vnodeAsyncs) / sizeof(vnodeAsyncs[0]) - 1)
#define MAX_ASYNC_ID (sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]) - 1)
static void vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
int32_t ret;
@ -447,36 +457,47 @@ static void vnodeAsyncLaunchWorker(SVAsync *async) {
}
}
int32_t vnodeAsyncOpen(int32_t numOfThreads) {
int32_t vnodeAsyncOpen() {
int32_t code = 0;
int32_t lino = 0;
// vnode-commit
code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit");
TSDB_CHECK_CODE(code, lino, _exit);
int32_t numOfThreads[] = {
0, //
tsNumOfCommitThreads, // vnode-commit
tsNumOfCommitThreads, // vnode-merge
tsNumOfCompactThreads, // vnode-compact
tsNumOfRetentionThreads, // vnode-retention
};
code = vnodeAsyncSetWorkers(1, numOfThreads);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) {
code = vnodeAsyncInit(&GVnodeAsyncs[i].async, GVnodeAsyncs[i].label);
TSDB_CHECK_CODE(code, lino, _exit);
// vnode-merge
code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge");
TSDB_CHECK_CODE(code, lino, _exit);
code = vnodeAsyncSetWorkers(2, numOfThreads);
TSDB_CHECK_CODE(code, lino, _exit);
code = vnodeAsyncSetWorkers(i, numOfThreads[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
return code;
}
void vnodeAsyncClose() {
int32_t ret;
ret = vnodeAsyncDestroy(&vnodeAsyncs[1]);
ret = vnodeAsyncDestroy(&vnodeAsyncs[2]);
for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) {
int32_t ret = vnodeAsyncDestroy(&GVnodeAsyncs[i].async);
}
}
int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
void *arg, SVATaskID *taskID) {
int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *), void *arg,
SVATaskID *taskID) {
SVAChannelID channelID = {
.async = async,
.id = 0,
};
return vnodeAsync2(&channelID, priority, execute, complete, arg, taskID);
}
int32_t vnodeAsync2(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
void *arg, SVATaskID *taskID) {
if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL ||
channelID->id < 0) {
return TSDB_CODE_INVALID_PARA;
@ -484,7 +505,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
int32_t ret;
int64_t id;
SVAsync *async = vnodeAsyncs[channelID->async];
SVAsync *async = GVnodeAsyncs[channelID->async].async;
// create task object
SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask));
@ -594,7 +615,7 @@ void vnodeAWait(SVATaskID *taskID) {
return;
}
SVAsync *async = vnodeAsyncs[taskID->async];
SVAsync *async = GVnodeAsyncs[taskID->async].async;
SVATask *task = NULL;
SVATask task2 = {
.taskId = taskID->id,
@ -623,7 +644,7 @@ int32_t vnodeACancel(SVATaskID *taskID) {
}
int32_t ret = 0;
SVAsync *async = vnodeAsyncs[taskID->async];
SVAsync *async = GVnodeAsyncs[taskID->async].async;
SVATask *task = NULL;
SVATask task2 = {
.taskId = taskID->id,
@ -660,7 +681,7 @@ int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
return TSDB_CODE_INVALID_PARA;
}
int32_t ret;
SVAsync *async = vnodeAsyncs[asyncID];
SVAsync *async = GVnodeAsyncs[asyncID].async;
(void)taosThreadMutexLock(&async->mutex);
async->numWorkers = numWorkers;
if (async->numIdleWorkers > 0) {
@ -676,7 +697,7 @@ int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
return TSDB_CODE_INVALID_PARA;
}
SVAsync *async = vnodeAsyncs[asyncID];
SVAsync *async = GVnodeAsyncs[asyncID].async;
// create channel object
SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel));
@ -722,7 +743,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
return TSDB_CODE_INVALID_PARA;
}
SVAsync *async = vnodeAsyncs[channelID->async];
SVAsync *async = GVnodeAsyncs[channelID->async].async;
SVAChannel *channel = NULL;
SVAChannel channel2 = {
.channelId = channelID->id,

View File

@ -389,8 +389,8 @@ int vnodeAsyncCommit(SVnode *pVnode) {
TSDB_CHECK_CODE(code, lino, _exit);
// schedule the task
code =
vnodeAsync(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask);
code = vnodeAsync2(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo,
&pVnode->commitTask);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:

View File

@ -20,12 +20,12 @@
static volatile int32_t VINIT = 0;
int vnodeInit(int nthreads, StopDnodeFp stopDnodeFp) {
int vnodeInit(StopDnodeFp stopDnodeFp) {
if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) {
return 0;
}
TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads));
TAOS_CHECK_RETURN(vnodeAsyncOpen());
TAOS_CHECK_RETURN(walInit(stopDnodeFp));
TAOS_CHECK_RETURN(tsdbInit());