From 18fb01264dd8449fa7c9e97b566adf258353f28a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 19 Dec 2024 14:51:21 +0800 Subject: [PATCH] change some apis --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 3 +- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 2 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/vnd.h | 8 ++- source/dnode/vnode/src/tsdb/tsdbFS2.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRetention.c | 2 +- source/dnode/vnode/src/vnd/vnodeAsync.c | 71 +++++++++++++-------- source/dnode/vnode/src/vnd/vnodeCommit.c | 4 +- source/dnode/vnode/src/vnd/vnodeModule.c | 4 +- 10 files changed, 62 insertions(+), 37 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e6333d2ddc..90df67e046 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -114,6 +114,7 @@ extern int32_t tsRetentionSpeedLimitMB; extern const char *tsAlterCompactTaskKeywords; extern int32_t tsNumOfCompactThreads; +extern int32_t tsNumOfRetentionThreads; // sync raft extern int32_t tsElectInterval; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 905dcb4fda..b74f8c9b1c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -14,12 +14,12 @@ */ #define _DEFAULT_SOURCE +#include "tglobal.h" #include "cJSON.h" #include "defines.h" #include "os.h" #include "osString.h" #include "tconfig.h" -#include "tglobal.h" #include "tgrant.h" #include "tjson.h" #include "tlog.h" @@ -104,6 +104,7 @@ int32_t tsRetentionSpeedLimitMB = 0; // unlimited const char *tsAlterCompactTaskKeywords = "max_compact_tasks"; int32_t tsNumOfCompactThreads = 2; +int32_t tsNumOfRetentionThreads = 1; // sync raft int32_t tsElectInterval = 25 * 1000; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 537c1f6297..6d5e117181 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -896,7 +896,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("vnode-sync", "initialized"); - if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) { + if ((code = vnodeInit(pInput->stopDnodeFp)) != 0) { dError("failed to init vnode since %s", tstrerror(code)); goto _OVER; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f4a678e629..b33bdb0976 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -51,7 +51,7 @@ extern const SVnodeCfg vnodeCfgDefault; typedef void (*StopDnodeFp)(); -int32_t vnodeInit(int32_t nthreads, StopDnodeFp stopDnodeFp); +int32_t vnodeInit(StopDnodeFp stopDnodeFp); void vnodeCleanup(); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs); bool vnodeShouldRemoveWal(SVnode *pVnode); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index ff622d2dab..882ea155e4 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -55,12 +55,14 @@ typedef enum { EVA_PRIORITY_LOW, } EVAPriority; -int32_t vnodeAsyncOpen(int32_t numOfThreads); +int32_t vnodeAsyncOpen(); void vnodeAsyncClose(); int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID); int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning); -int32_t vnodeAsync(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), - void* arg, SVATaskID* taskID); +int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg, + SVATaskID* taskID); +int32_t vnodeAsync2(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), + void* arg, SVATaskID* taskID); void vnodeAWait(SVATaskID* taskID); int32_t vnodeACancel(SVATaskID* taskID); int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 9a7cdca8f7..f024dd7210 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -946,7 +946,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { arg->tsdb = fs->tsdb; arg->fid = fset->fid; - code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, NULL); + code = vnodeAsync2(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, NULL); TSDB_CHECK_CODE(code, lino, _exit); fset->mergeScheduled = true; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 7859ee4c66..0cd83e3f8c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -380,7 +380,7 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate) arg->fid = fset->fid; arg->s3Migrate = s3Migrate; - if ((code = vnodeAsync(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) { + if ((code = vnodeAsync2(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) { taosMemoryFree(arg); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/vnd/vnodeAsync.c b/source/dnode/vnode/src/vnd/vnodeAsync.c index 3f40ace6c4..7b2412e7ae 100644 --- a/source/dnode/vnode/src/vnd/vnodeAsync.c +++ b/source/dnode/vnode/src/vnd/vnodeAsync.c @@ -118,9 +118,19 @@ struct SVAsync { SVHashTable *taskTable; }; -SVAsync *vnodeAsyncs[3]; +struct { + const char *label; + SVAsync *async; +} GVnodeAsyncs[] = { + [0] = {}, + [1] = {"vnode-commit", NULL}, + [2] = {"vnode-merge", NULL}, + [3] = {"vnode-compact", NULL}, + [4] = {"vnode-retention", NULL}, +}; + #define MIN_ASYNC_ID 1 -#define MAX_ASYNC_ID (sizeof(vnodeAsyncs) / sizeof(vnodeAsyncs[0]) - 1) +#define MAX_ASYNC_ID (sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]) - 1) static void vnodeAsyncTaskDone(SVAsync *async, SVATask *task) { int32_t ret; @@ -447,36 +457,47 @@ static void vnodeAsyncLaunchWorker(SVAsync *async) { } } -int32_t vnodeAsyncOpen(int32_t numOfThreads) { +int32_t vnodeAsyncOpen() { int32_t code = 0; int32_t lino = 0; - // vnode-commit - code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit"); - TSDB_CHECK_CODE(code, lino, _exit); + int32_t numOfThreads[] = { + 0, // + tsNumOfCommitThreads, // vnode-commit + tsNumOfCommitThreads, // vnode-merge + tsNumOfCompactThreads, // vnode-compact + tsNumOfRetentionThreads, // vnode-retention + }; - code = vnodeAsyncSetWorkers(1, numOfThreads); - TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) { + code = vnodeAsyncInit(&GVnodeAsyncs[i].async, GVnodeAsyncs[i].label); + TSDB_CHECK_CODE(code, lino, _exit); - // vnode-merge - code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge"); - TSDB_CHECK_CODE(code, lino, _exit); - - code = vnodeAsyncSetWorkers(2, numOfThreads); - TSDB_CHECK_CODE(code, lino, _exit); + code = vnodeAsyncSetWorkers(i, numOfThreads[i]); + TSDB_CHECK_CODE(code, lino, _exit); + } _exit: return code; } void vnodeAsyncClose() { - int32_t ret; - ret = vnodeAsyncDestroy(&vnodeAsyncs[1]); - ret = vnodeAsyncDestroy(&vnodeAsyncs[2]); + for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) { + int32_t ret = vnodeAsyncDestroy(&GVnodeAsyncs[i].async); + } } -int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *), - void *arg, SVATaskID *taskID) { +int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *), void *arg, + SVATaskID *taskID) { + SVAChannelID channelID = { + .async = async, + .id = 0, + }; + return vnodeAsync2(&channelID, priority, execute, complete, arg, taskID); +} + +int32_t vnodeAsync2(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *), + void *arg, SVATaskID *taskID) { if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL || channelID->id < 0) { return TSDB_CODE_INVALID_PARA; @@ -484,7 +505,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec int32_t ret; int64_t id; - SVAsync *async = vnodeAsyncs[channelID->async]; + SVAsync *async = GVnodeAsyncs[channelID->async].async; // create task object SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask)); @@ -594,7 +615,7 @@ void vnodeAWait(SVATaskID *taskID) { return; } - SVAsync *async = vnodeAsyncs[taskID->async]; + SVAsync *async = GVnodeAsyncs[taskID->async].async; SVATask *task = NULL; SVATask task2 = { .taskId = taskID->id, @@ -623,7 +644,7 @@ int32_t vnodeACancel(SVATaskID *taskID) { } int32_t ret = 0; - SVAsync *async = vnodeAsyncs[taskID->async]; + SVAsync *async = GVnodeAsyncs[taskID->async].async; SVATask *task = NULL; SVATask task2 = { .taskId = taskID->id, @@ -660,7 +681,7 @@ int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) { return TSDB_CODE_INVALID_PARA; } int32_t ret; - SVAsync *async = vnodeAsyncs[asyncID]; + SVAsync *async = GVnodeAsyncs[asyncID].async; (void)taosThreadMutexLock(&async->mutex); async->numWorkers = numWorkers; if (async->numIdleWorkers > 0) { @@ -676,7 +697,7 @@ int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) { return TSDB_CODE_INVALID_PARA; } - SVAsync *async = vnodeAsyncs[asyncID]; + SVAsync *async = GVnodeAsyncs[asyncID].async; // create channel object SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel)); @@ -722,7 +743,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) { return TSDB_CODE_INVALID_PARA; } - SVAsync *async = vnodeAsyncs[channelID->async]; + SVAsync *async = GVnodeAsyncs[channelID->async].async; SVAChannel *channel = NULL; SVAChannel channel2 = { .channelId = channelID->id, diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 28d27b8893..0c44048fbf 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -389,8 +389,8 @@ int vnodeAsyncCommit(SVnode *pVnode) { TSDB_CHECK_CODE(code, lino, _exit); // schedule the task - code = - vnodeAsync(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask); + code = vnodeAsync2(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, + &pVnode->commitTask); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 9d326defdd..a91c46db70 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -20,12 +20,12 @@ static volatile int32_t VINIT = 0; -int vnodeInit(int nthreads, StopDnodeFp stopDnodeFp) { +int vnodeInit(StopDnodeFp stopDnodeFp) { if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) { return 0; } - TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads)); + TAOS_CHECK_RETURN(vnodeAsyncOpen()); TAOS_CHECK_RETURN(walInit(stopDnodeFp)); TAOS_CHECK_RETURN(tsdbInit());