more
This commit is contained in:
parent
136b04a5f6
commit
43d1015c37
|
@ -19,6 +19,8 @@
|
|||
#define VNODE_BUF_POOL_SHARDS 3
|
||||
|
||||
struct SVBufPool {
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t hasFree;
|
||||
TD_DLIST(SVMemAllocator) free;
|
||||
TD_DLIST(SVMemAllocator) incycle;
|
||||
SVMemAllocator *inuse;
|
||||
|
@ -110,6 +112,8 @@ void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
|
|||
if (pBufPool->inuse) {
|
||||
tDListPop(&(pBufPool->free), pBufPool->inuse);
|
||||
break;
|
||||
} else {
|
||||
// tsem_wait(&(pBufPool->hasFree));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
|
||||
#define _TD_TSDB_COMMIT_QUEUE_H_
|
||||
|
||||
typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T;
|
||||
|
||||
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
|
||||
|
||||
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
|
|
@ -1,213 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tsdbint.h"
|
||||
|
||||
typedef struct {
|
||||
bool stop;
|
||||
pthread_mutex_t lock;
|
||||
pthread_cond_t queueNotEmpty;
|
||||
int nthreads;
|
||||
int refCount;
|
||||
SList * queue;
|
||||
pthread_t * threads;
|
||||
} SCommitQueue;
|
||||
|
||||
typedef struct {
|
||||
TSDB_REQ_T req;
|
||||
STsdbRepo *pRepo;
|
||||
} SReq;
|
||||
|
||||
static void *tsdbLoopCommit(void *arg);
|
||||
|
||||
static SCommitQueue tsCommitQueue = {0};
|
||||
|
||||
int tsdbInitCommitQueue() {
|
||||
int nthreads = tsNumOfCommitThreads;
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
|
||||
if (nthreads < 1) nthreads = 1;
|
||||
|
||||
pQueue->stop = false;
|
||||
pQueue->nthreads = nthreads;
|
||||
|
||||
pQueue->queue = tdListNew(0);
|
||||
if (pQueue->queue == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t));
|
||||
if (pQueue->threads == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tdListFree(pQueue->queue);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pthread_mutex_init(&(pQueue->lock), NULL);
|
||||
pthread_cond_init(&(pQueue->queueNotEmpty), NULL);
|
||||
|
||||
for (int i = 0; i < nthreads; i++) {
|
||||
pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbDestroyCommitQueue() {
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
|
||||
pthread_mutex_lock(&(pQueue->lock));
|
||||
|
||||
if (pQueue->stop) {
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
return;
|
||||
}
|
||||
|
||||
pQueue->stop = true;
|
||||
pthread_cond_broadcast(&(pQueue->queueNotEmpty));
|
||||
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
|
||||
for (size_t i = 0; i < pQueue->nthreads; i++) {
|
||||
pthread_join(pQueue->threads[i], NULL);
|
||||
}
|
||||
|
||||
free(pQueue->threads);
|
||||
tdListFree(pQueue->queue);
|
||||
pthread_cond_destroy(&(pQueue->queueNotEmpty));
|
||||
pthread_mutex_destroy(&(pQueue->lock));
|
||||
}
|
||||
|
||||
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
|
||||
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq));
|
||||
if (pNode == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
((SReq *)pNode->data)->req = req;
|
||||
((SReq *)pNode->data)->pRepo = pRepo;
|
||||
|
||||
pthread_mutex_lock(&(pQueue->lock));
|
||||
|
||||
// ASSERT(pQueue->stop);
|
||||
|
||||
tdListAppendNode(pQueue->queue, pNode);
|
||||
pthread_cond_signal(&(pQueue->queueNotEmpty));
|
||||
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
||||
pthread_mutex_lock(&pRepo->save_mutex);
|
||||
|
||||
pRepo->config_changed = false;
|
||||
STsdbCfg * pSaveCfg = &pRepo->save_config;
|
||||
STsdbCfg oldCfg;
|
||||
int32_t oldTotalBlocks = pRepo->config.totalBlocks;
|
||||
|
||||
memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg));
|
||||
|
||||
pRepo->config.compression = pRepo->save_config.compression;
|
||||
pRepo->config.keep = pRepo->save_config.keep;
|
||||
pRepo->config.keep1 = pRepo->save_config.keep1;
|
||||
pRepo->config.keep2 = pRepo->save_config.keep2;
|
||||
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
|
||||
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
|
||||
|
||||
pthread_mutex_unlock(&pRepo->save_mutex);
|
||||
|
||||
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)",
|
||||
REPO_ID(pRepo),
|
||||
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||
pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks);
|
||||
|
||||
int err = tsdbExpandPool(pRepo, oldTotalBlocks);
|
||||
if (!TAOS_SUCCEEDED(err)) {
|
||||
tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s",
|
||||
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
|
||||
}
|
||||
|
||||
if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
|
||||
if (tsdbLockRepo(pRepo) < 0) return;
|
||||
tsdbCacheLastData(pRepo, &oldCfg);
|
||||
tsdbUnlockRepo(pRepo);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void *tsdbLoopCommit(void *arg) {
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
SListNode * pNode = NULL;
|
||||
STsdbRepo * pRepo = NULL;
|
||||
TSDB_REQ_T req;
|
||||
|
||||
setThreadName("tsdbCommit");
|
||||
|
||||
while (true) {
|
||||
pthread_mutex_lock(&(pQueue->lock));
|
||||
|
||||
while (true) {
|
||||
pNode = tdListPopHead(pQueue->queue);
|
||||
if (pNode == NULL) {
|
||||
if (pQueue->stop && pQueue->refCount <= 0) {
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
goto _exit;
|
||||
} else {
|
||||
pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock));
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
|
||||
req = ((SReq *)pNode->data)->req;
|
||||
pRepo = ((SReq *)pNode->data)->pRepo;
|
||||
|
||||
if (req == COMMIT_REQ) {
|
||||
tsdbCommitData(pRepo);
|
||||
} else if (req == COMPACT_REQ) {
|
||||
tsdbCompactImpl(pRepo);
|
||||
} else if (req == COMMIT_CONFIG_REQ) {
|
||||
ASSERT(pRepo->config_changed);
|
||||
tsdbApplyRepoConfig(pRepo);
|
||||
tsem_post(&(pRepo->readyToCommit));
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
listNodeFree(pNode);
|
||||
}
|
||||
|
||||
_exit:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void tsdbIncCommitRef(int vgId) {
|
||||
int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1);
|
||||
tsdbDebug("vgId:%d, inc commit queue ref to %d", vgId, refCount);
|
||||
}
|
||||
|
||||
void tsdbDecCommitRef(int vgId) {
|
||||
int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1);
|
||||
pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty));
|
||||
tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount);
|
||||
}
|
Loading…
Reference in New Issue