TD-2072
This commit is contained in:
parent
ded34aab69
commit
a31a6ed9a5
|
@ -324,6 +324,8 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
|
||||||
int tsdbInitCommitQueue(int nthreads);
|
int tsdbInitCommitQueue(int nthreads);
|
||||||
void tsdbDestroyCommitQueue();
|
void tsdbDestroyCommitQueue();
|
||||||
int tsdbSyncCommit(TSDB_REPO_T *repo);
|
int tsdbSyncCommit(TSDB_REPO_T *repo);
|
||||||
|
int tsdbIncCommitRef(int vgId);
|
||||||
|
void tsdbDecCommitRef(int vgId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tlist.h"
|
#include "tlist.h"
|
||||||
|
#include "tref.h"
|
||||||
#include "tsdbMain.h"
|
#include "tsdbMain.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -22,6 +23,7 @@ typedef struct {
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
pthread_cond_t queueNotEmpty;
|
pthread_cond_t queueNotEmpty;
|
||||||
int nthreads;
|
int nthreads;
|
||||||
|
int refCount;
|
||||||
SList * queue;
|
SList * queue;
|
||||||
pthread_t * threads;
|
pthread_t * threads;
|
||||||
} SCommitQueue;
|
} SCommitQueue;
|
||||||
|
@ -123,7 +125,7 @@ static void *tsdbLoopCommit(void *arg) {
|
||||||
while (true) {
|
while (true) {
|
||||||
pNode = tdListPopHead(pQueue->queue);
|
pNode = tdListPopHead(pQueue->queue);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
if (pQueue->stop) {
|
if (pQueue->stop && pQueue->refCount == 0) {
|
||||||
pthread_mutex_unlock(&(pQueue->lock));
|
pthread_mutex_unlock(&(pQueue->lock));
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else {
|
} else {
|
||||||
|
@ -145,3 +147,13 @@ static void *tsdbLoopCommit(void *arg) {
|
||||||
_exit:
|
_exit:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbIncCommitRef(int vgId) {
|
||||||
|
int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1);
|
||||||
|
tsdbDebug("vgId:%d, inc commit queue ref to %d", refCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbDecCommitRef(int vgId) {
|
||||||
|
int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1);
|
||||||
|
tsdbDebug("vgId:%d, dec commit queue ref to %d", refCount);
|
||||||
|
}
|
|
@ -355,6 +355,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
|
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
|
||||||
|
|
||||||
|
tsdbIncCommitRef(pVnode->vgId);
|
||||||
taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
|
taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -446,6 +447,7 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
|
|
||||||
tsem_destroy(&pVnode->sem);
|
tsem_destroy(&pVnode->sem);
|
||||||
free(pVnode);
|
free(pVnode);
|
||||||
|
tsdbDecCommitRef(vgId);
|
||||||
|
|
||||||
int32_t count = taosHashGetSize(tsVnodesHash);
|
int32_t count = taosHashGetSize(tsVnodesHash);
|
||||||
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
|
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
|
||||||
|
|
Loading…
Reference in New Issue