TD-353
This commit is contained in:
parent
6eba0ff9f0
commit
2e3441d108
|
@ -318,6 +318,7 @@ int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||||
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||||
|
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||||
|
|
||||||
// ------------------ tsdbFile.c
|
// ------------------ tsdbFile.c
|
||||||
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
||||||
|
|
|
@ -135,12 +135,14 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: all working thread and query thread must stopped when calling this function
|
||||||
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
|
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
|
||||||
if (repo == NULL) return;
|
if (repo == NULL) return;
|
||||||
|
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
|
|
||||||
// TODO: wait for commit over
|
tsdbAsyncCommit(pRepo);
|
||||||
|
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL);
|
||||||
|
|
||||||
tsdbCloseFileH(pRepo);
|
tsdbCloseFileH(pRepo);
|
||||||
tsdbCloseBufPool(pRepo);
|
tsdbCloseBufPool(pRepo);
|
||||||
|
|
|
@ -173,42 +173,10 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
|
||||||
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||||
STsdbCfg * pCfg = &pRepo->config;
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||||
int code = 0;
|
|
||||||
|
|
||||||
if (pBufBlock != NULL && pBufBlock->remain < bytes) {
|
if (pBufBlock != NULL && pBufBlock->remain < bytes) {
|
||||||
if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 2) { // need to commit mem
|
if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 2) { // need to commit mem
|
||||||
if (pRepo->imem) {
|
if (tsdbAsyncCommit(pRepo) < 0) return NULL;
|
||||||
code = pthread_join(pRepo->commitThread, NULL);
|
|
||||||
if (code != 0) {
|
|
||||||
tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbUnRefMemTable(pRepo, pRepo->imem) < 0) {
|
|
||||||
tsdbError("vgId:%d failed to unref memtable since %s", REPO_ID(pRepo), tstrerror(terrno))
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pRepo->commit == 0);
|
|
||||||
SMemTable *pImem = pRepo->imem;
|
|
||||||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
|
|
||||||
|
|
||||||
if (tsdbLockRepo(pRepo) < 0) return NULL;
|
|
||||||
pRepo->imem = pRepo->mem;
|
|
||||||
pRepo->mem = NULL;
|
|
||||||
pRepo->commit = 1;
|
|
||||||
code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo);
|
|
||||||
if (code != 0) {
|
|
||||||
tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
tsdbUnlockRepo(pRepo);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
if (tsdbUnlockRepo(pRepo) < 0) return NULL;
|
|
||||||
|
|
||||||
if (pImem && tsdbUnRefMemTable(pRepo, pImem) < 0) return NULL;
|
|
||||||
} else {
|
} else {
|
||||||
if (tsdbLockRepo(pRepo) < 0) return NULL;
|
if (tsdbLockRepo(pRepo) < 0) return NULL;
|
||||||
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
|
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
|
||||||
|
@ -242,6 +210,42 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||||
|
SMemTable *pIMem = pRepo->imem;
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
if (pIMem != NULL) {
|
||||||
|
ASSERT(pRepo->commit);
|
||||||
|
code = pthread_join(pRepo->commitThread, NULL);
|
||||||
|
if (code != 0) {
|
||||||
|
tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pRepo->commit == 0);
|
||||||
|
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
|
||||||
|
if (pRepo->mem != NULL) {
|
||||||
|
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||||
|
pRepo->imem = pRepo->mem;
|
||||||
|
pRepo->mem = NULL;
|
||||||
|
pRepo->commit = 1;
|
||||||
|
code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo);
|
||||||
|
if (code != 0) {
|
||||||
|
tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
tsdbUnlockRepo(pRepo);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------- LOCAL FUNCTIONS ----------------
|
// ---------------- LOCAL FUNCTIONS ----------------
|
||||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
|
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
|
||||||
ASSERT(pRepo != NULL);
|
ASSERT(pRepo != NULL);
|
||||||
|
|
Loading…
Reference in New Issue