diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 4b8aa5dfaf..ba628a808d 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -10,10 +10,11 @@ target_sources( "src/vnd/vnodeCommit.c" "src/vnd/vnodeInt.c" "src/vnd/vnodeMain.c" - "src/vnd/vnodeMgr.c" "src/vnd/vnodeQuery.c" "src/vnd/vnodeStateMgr.c" "src/vnd/vnodeWrite.c" + # "src/vnd/vnodeModule.c" + "src/vnd/vnodeMgr.c" # meta # "src/meta/metaBDBImpl.c" diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 78e1b1ce03..ce81006661 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -246,6 +246,26 @@ typedef struct { static STqPushMgmt tqPushMgmt; +// init once +int tqInit(); +void tqCleanUp(); + +// open in each vnode +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, + SMemAllocatorFactory* allocFac); +void tqClose(STQ*); +// required by vnode +int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); +int tqCommit(STQ*); + +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); +int32_t tqProcessSetConnReq(STQ* pTq, char* msg); +int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessCancelConnReq(STQ* pTq, char* msg); +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); + int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a3c7f73094..6fbf4fb1df 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -35,9 +35,10 @@ #include "tstream.h" #include "ttime.h" #include "ttimer.h" -#include "vnode.h" #include "wal.h" +#include "vnode.h" + #ifdef __cplusplus extern "C" { #endif @@ -157,27 +158,6 @@ void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); void vmaFree(SVMemAllocator* pVMA, void* ptr); bool vmaIsFull(SVMemAllocator* pVMA); -// init once -int tqInit(); -void tqCleanUp(); - -// open in each vnode -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, - SMemAllocatorFactory* allocFac); -void tqClose(STQ*); - -// required by vnode -int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); -int tqCommit(STQ*); - -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); -int32_t tqProcessSetConnReq(STQ* pTq, char* msg); -int32_t tqProcessRebReq(STQ* pTq, char* msg); -int32_t tqProcessCancelConnReq(STQ* pTq, char* msg); -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); - // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c new file mode 100644 index 0000000000..2b5b46a45d --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeInt.h" + +typedef struct SVnodeTask SVnodeTask; +struct SVnodeTask { + SVnodeTask* next; + SVnodeTask* prev; + int (*execute)(void*); + void* arg; +}; + +struct SVnodeGlobal { + int8_t init; + int8_t stop; + int nthreads; + TdThread* threads; + TdThreadMutex mutex; + TdThreadCond hasTask; + SVnodeTask queue; +}; + +struct SVnodeGlobal vnodeGlobal; + +static void* loop(void* arg); + +int vnodeInit(int nthreads) { + int8_t init; + int ret; + + init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 0, 1); + if (init) { + return 0; + } + + vnodeGlobal.stop = 0; + + vnodeGlobal.queue.next = &vnodeGlobal.queue; + vnodeGlobal.queue.prev = &vnodeGlobal.queue; + + vnodeGlobal.nthreads = nthreads; + vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread)); + if (vnodeGlobal.threads == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + vError("failed to init vnode module since: %s", tstrerror(terrno)); + return -1; + } + + taosThreadMutexInit(&vnodeGlobal.mutex, NULL); + taosThreadCondInit(&vnodeGlobal.hasTask, NULL); + + for (int i = 0; i < nthreads; i++) { + taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL); + } + + if (walInit() < 0) { + return -1; + } + + return 0; +} + +void vnodeCleanup() { + int8_t init; + + init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 1, 0); + if (init == 0) return; + + // set stop + taosThreadMutexLock(&(vnodeGlobal.mutex)); + vnodeGlobal.stop = 1; + taosThreadCondBroadcast(&(vnodeGlobal.hasTask)); + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + + // wait for threads + for (int i = 0; i < vnodeGlobal.nthreads; i++) { + taosThreadJoin(vnodeGlobal.threads[i], NULL); + } + + // clear source + taosMemoryFreeClear(vnodeGlobal.threads); + taosThreadCondDestroy(&(vnodeGlobal.hasTask)); + taosThreadMutexDestroy(&(vnodeGlobal.mutex)); +} + +int vnodeScheduleTask(int (*execute)(void*), void* arg) { + SVnodeTask* pTask; + + ASSERT(!vnodeGlobal.stop); + + pTask = taosMemoryMalloc(sizeof(*pTask)); + if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pTask->execute = execute; + pTask->arg = arg; + + taosThreadMutexLock(&(vnodeGlobal.mutex)); + pTask->next = &vnodeGlobal.queue; + pTask->prev = vnodeGlobal.queue.prev; + vnodeGlobal.queue.prev->next = pTask; + vnodeGlobal.queue.prev = pTask; + taosThreadCondSignal(&(vnodeGlobal.hasTask)); + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + + return 0; +} + +/* ------------------------ STATIC METHODS ------------------------ */ +static void* loop(void* arg) { + SVnodeTask* pTask; + int ret; + + setThreadName("vnode-commit"); + + for (;;) { + taosThreadMutexLock(&(vnodeGlobal.mutex)); + for (;;) { + pTask = vnodeGlobal.queue.next; + if (pTask == &vnodeGlobal.queue) { + // no task + if (vnodeGlobal.stop) { + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + return NULL; + } else { + taosThreadCondWait(&(vnodeGlobal.hasTask), &(vnodeGlobal.mutex)); + } + } else { + // has task + pTask->prev->next = pTask->next; + pTask->next->prev = pTask->prev; + break; + } + } + + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + + pTask->execute(pTask->arg); + taosMemoryFree(pTask); + } + + return NULL; +} \ No newline at end of file