refact vnode
This commit is contained in:
parent
db7c80b2b6
commit
593adbd509
|
@ -10,10 +10,11 @@ target_sources(
|
|||
"src/vnd/vnodeCommit.c"
|
||||
"src/vnd/vnodeInt.c"
|
||||
"src/vnd/vnodeMain.c"
|
||||
"src/vnd/vnodeMgr.c"
|
||||
"src/vnd/vnodeQuery.c"
|
||||
"src/vnd/vnodeStateMgr.c"
|
||||
"src/vnd/vnodeWrite.c"
|
||||
# "src/vnd/vnodeModule.c"
|
||||
"src/vnd/vnodeMgr.c"
|
||||
|
||||
# meta
|
||||
# "src/meta/metaBDBImpl.c"
|
||||
|
|
|
@ -246,6 +246,26 @@ typedef struct {
|
|||
|
||||
static STqPushMgmt tqPushMgmt;
|
||||
|
||||
// init once
|
||||
int tqInit();
|
||||
void tqCleanUp();
|
||||
|
||||
// open in each vnode
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig,
|
||||
SMemAllocatorFactory* allocFac);
|
||||
void tqClose(STQ*);
|
||||
// required by vnode
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
|
||||
int tqCommit(STQ*);
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
||||
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
|
||||
int32_t tqProcessRebReq(STQ* pTq, char* msg);
|
||||
int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
|
||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
||||
|
||||
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
|
||||
int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);
|
||||
|
||||
|
|
|
@ -35,9 +35,10 @@
|
|||
#include "tstream.h"
|
||||
#include "ttime.h"
|
||||
#include "ttimer.h"
|
||||
#include "vnode.h"
|
||||
#include "wal.h"
|
||||
|
||||
#include "vnode.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
@ -157,27 +158,6 @@ void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
|
|||
void vmaFree(SVMemAllocator* pVMA, void* ptr);
|
||||
bool vmaIsFull(SVMemAllocator* pVMA);
|
||||
|
||||
// init once
|
||||
int tqInit();
|
||||
void tqCleanUp();
|
||||
|
||||
// open in each vnode
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig,
|
||||
SMemAllocatorFactory* allocFac);
|
||||
void tqClose(STQ*);
|
||||
|
||||
// required by vnode
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
|
||||
int tqCommit(STQ*);
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
||||
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
|
||||
int32_t tqProcessRebReq(STQ* pTq, char* msg);
|
||||
int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
|
||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
||||
|
||||
// sma
|
||||
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
||||
|
||||
|
|
|
@ -0,0 +1,158 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "vnodeInt.h"
|
||||
|
||||
typedef struct SVnodeTask SVnodeTask;
|
||||
struct SVnodeTask {
|
||||
SVnodeTask* next;
|
||||
SVnodeTask* prev;
|
||||
int (*execute)(void*);
|
||||
void* arg;
|
||||
};
|
||||
|
||||
struct SVnodeGlobal {
|
||||
int8_t init;
|
||||
int8_t stop;
|
||||
int nthreads;
|
||||
TdThread* threads;
|
||||
TdThreadMutex mutex;
|
||||
TdThreadCond hasTask;
|
||||
SVnodeTask queue;
|
||||
};
|
||||
|
||||
struct SVnodeGlobal vnodeGlobal;
|
||||
|
||||
static void* loop(void* arg);
|
||||
|
||||
int vnodeInit(int nthreads) {
|
||||
int8_t init;
|
||||
int ret;
|
||||
|
||||
init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 0, 1);
|
||||
if (init) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
vnodeGlobal.stop = 0;
|
||||
|
||||
vnodeGlobal.queue.next = &vnodeGlobal.queue;
|
||||
vnodeGlobal.queue.prev = &vnodeGlobal.queue;
|
||||
|
||||
vnodeGlobal.nthreads = nthreads;
|
||||
vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread));
|
||||
if (vnodeGlobal.threads == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
vError("failed to init vnode module since: %s", tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosThreadMutexInit(&vnodeGlobal.mutex, NULL);
|
||||
taosThreadCondInit(&vnodeGlobal.hasTask, NULL);
|
||||
|
||||
for (int i = 0; i < nthreads; i++) {
|
||||
taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL);
|
||||
}
|
||||
|
||||
if (walInit() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeCleanup() {
|
||||
int8_t init;
|
||||
|
||||
init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 1, 0);
|
||||
if (init == 0) return;
|
||||
|
||||
// set stop
|
||||
taosThreadMutexLock(&(vnodeGlobal.mutex));
|
||||
vnodeGlobal.stop = 1;
|
||||
taosThreadCondBroadcast(&(vnodeGlobal.hasTask));
|
||||
taosThreadMutexUnlock(&(vnodeGlobal.mutex));
|
||||
|
||||
// wait for threads
|
||||
for (int i = 0; i < vnodeGlobal.nthreads; i++) {
|
||||
taosThreadJoin(vnodeGlobal.threads[i], NULL);
|
||||
}
|
||||
|
||||
// clear source
|
||||
taosMemoryFreeClear(vnodeGlobal.threads);
|
||||
taosThreadCondDestroy(&(vnodeGlobal.hasTask));
|
||||
taosThreadMutexDestroy(&(vnodeGlobal.mutex));
|
||||
}
|
||||
|
||||
int vnodeScheduleTask(int (*execute)(void*), void* arg) {
|
||||
SVnodeTask* pTask;
|
||||
|
||||
ASSERT(!vnodeGlobal.stop);
|
||||
|
||||
pTask = taosMemoryMalloc(sizeof(*pTask));
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTask->execute = execute;
|
||||
pTask->arg = arg;
|
||||
|
||||
taosThreadMutexLock(&(vnodeGlobal.mutex));
|
||||
pTask->next = &vnodeGlobal.queue;
|
||||
pTask->prev = vnodeGlobal.queue.prev;
|
||||
vnodeGlobal.queue.prev->next = pTask;
|
||||
vnodeGlobal.queue.prev = pTask;
|
||||
taosThreadCondSignal(&(vnodeGlobal.hasTask));
|
||||
taosThreadMutexUnlock(&(vnodeGlobal.mutex));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* ------------------------ STATIC METHODS ------------------------ */
|
||||
static void* loop(void* arg) {
|
||||
SVnodeTask* pTask;
|
||||
int ret;
|
||||
|
||||
setThreadName("vnode-commit");
|
||||
|
||||
for (;;) {
|
||||
taosThreadMutexLock(&(vnodeGlobal.mutex));
|
||||
for (;;) {
|
||||
pTask = vnodeGlobal.queue.next;
|
||||
if (pTask == &vnodeGlobal.queue) {
|
||||
// no task
|
||||
if (vnodeGlobal.stop) {
|
||||
taosThreadMutexUnlock(&(vnodeGlobal.mutex));
|
||||
return NULL;
|
||||
} else {
|
||||
taosThreadCondWait(&(vnodeGlobal.hasTask), &(vnodeGlobal.mutex));
|
||||
}
|
||||
} else {
|
||||
// has task
|
||||
pTask->prev->next = pTask->next;
|
||||
pTask->next->prev = pTask->prev;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&(vnodeGlobal.mutex));
|
||||
|
||||
pTask->execute(pTask->arg);
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
Loading…
Reference in New Issue