Merge pull request #19918 from taosdata/feat/vnode_compact
fix: vnode compact bug
This commit is contained in:
commit
8151ab8b11
|
@ -1,27 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_CACHE_H_
|
|
||||||
#define _TD_CACHE_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_CACHE_H_*/
|
|
|
@ -100,7 +100,7 @@ int32_t vnodeShouldCommit(SVnode* pVnode);
|
||||||
void vnodeUpdCommitSched(SVnode* pVnode);
|
void vnodeUpdCommitSched(SVnode* pVnode);
|
||||||
void vnodeRollback(SVnode* pVnode);
|
void vnodeRollback(SVnode* pVnode);
|
||||||
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
|
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
|
||||||
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
|
int32_t vnodeCommitInfo(const char* dir);
|
||||||
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
|
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
|
||||||
int32_t vnodeSyncCommit(SVnode* pVnode);
|
int32_t vnodeSyncCommit(SVnode* pVnode);
|
||||||
int32_t vnodeAsyncCommit(SVnode* pVnode);
|
int32_t vnodeAsyncCommit(SVnode* pVnode);
|
||||||
|
|
|
@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader;
|
||||||
typedef struct SRSmaSnapWriter SRSmaSnapWriter;
|
typedef struct SRSmaSnapWriter SRSmaSnapWriter;
|
||||||
typedef struct SSnapDataHdr SSnapDataHdr;
|
typedef struct SSnapDataHdr SSnapDataHdr;
|
||||||
typedef struct SCommitInfo SCommitInfo;
|
typedef struct SCommitInfo SCommitInfo;
|
||||||
|
typedef struct SCompactInfo SCompactInfo;
|
||||||
typedef struct SQueryNode SQueryNode;
|
typedef struct SQueryNode SQueryNode;
|
||||||
|
|
||||||
#define VNODE_META_DIR "meta"
|
#define VNODE_META_DIR "meta"
|
||||||
|
@ -173,6 +174,7 @@ int tsdbClose(STsdb** pTsdb);
|
||||||
int32_t tsdbBegin(STsdb* pTsdb);
|
int32_t tsdbBegin(STsdb* pTsdb);
|
||||||
int32_t tsdbPrepareCommit(STsdb* pTsdb);
|
int32_t tsdbPrepareCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
|
int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
|
||||||
|
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
||||||
int32_t tsdbFinishCommit(STsdb* pTsdb);
|
int32_t tsdbFinishCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
||||||
|
@ -450,10 +452,14 @@ struct SCommitInfo {
|
||||||
SVnodeInfo info;
|
SVnodeInfo info;
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
TXN* txn;
|
TXN* txn;
|
||||||
|
|
||||||
// APIs
|
|
||||||
int32_t (*commitFn)(STsdb* pTsdb, SCommitInfo* pInfo);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SCompactInfo {
|
||||||
|
SVnode* pVnode;
|
||||||
|
int32_t flag;
|
||||||
|
int64_t commitID;
|
||||||
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -88,9 +88,8 @@ static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
STsdb *pTsdb = pCompactor->pTsdb;
|
STsdb *pTsdb = pCompactor->pTsdb;
|
||||||
|
code = tsdbFSRollback(pTsdb);
|
||||||
// TODO
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
ASSERT(0);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -314,7 +313,7 @@ static bool tsdbCompactTableIsDropped(STsdbCompactor *pCompactor) {
|
||||||
SMetaInfo info;
|
SMetaInfo info;
|
||||||
|
|
||||||
if (pCompactor->pIter->rowInfo.uid == pCompactor->tbid.uid) return false;
|
if (pCompactor->pIter->rowInfo.uid == pCompactor->tbid.uid) return false;
|
||||||
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->tbid.uid, &info, NULL)) {
|
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, &info, NULL)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -571,12 +570,12 @@ static void tsdbEndCompact(STsdbCompactor *pCompactor) {
|
||||||
tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID);
|
tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
|
static int32_t tsdbBeginCompact(STsdb *pTsdb, SCompactInfo *pInfo, STsdbCompactor *pCompactor) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
pCompactor->pTsdb = pTsdb;
|
pCompactor->pTsdb = pTsdb;
|
||||||
pCompactor->commitID = 0; // TODO
|
pCompactor->commitID = pInfo->commitID;
|
||||||
pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
||||||
pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
||||||
pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
|
pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
|
||||||
|
@ -637,12 +636,12 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
|
int32_t tsdbCompact(STsdb *pTsdb, SCompactInfo *pInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
STsdbCompactor *pCompactor = &(STsdbCompactor){0};
|
STsdbCompactor *pCompactor = &(STsdbCompactor){0};
|
||||||
|
|
||||||
if ((code = tsdbBeginCompact(pTsdb, pCompactor))) return code;
|
if ((code = tsdbBeginCompact(pTsdb, pInfo, pCompactor))) return code;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
|
SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
|
||||||
|
|
|
@ -1057,7 +1057,6 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
|
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDelData *pDelData;
|
SDelData *pDelData;
|
||||||
|
@ -1097,13 +1096,12 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
|
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
|
||||||
SDelData *pDelData;
|
SDelData *pDelData;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t dataNum = eidx - sidx + 1;
|
int32_t dataNum = eidx - sidx + 1;
|
||||||
SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
|
SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
|
||||||
SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
|
SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
|
||||||
|
|
||||||
for (int32_t i = sidx; i <= eidx; ++i) {
|
for (int32_t i = sidx; i <= eidx; ++i) {
|
||||||
pDelData = (SDelData *)taosArrayGet(aDelData, i);
|
pDelData = (SDelData *)taosArrayGet(aDelData, i);
|
||||||
|
@ -1116,8 +1114,8 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
|
||||||
|
|
||||||
int32_t skylineNum = taosArrayGetSize(pSkyline);
|
int32_t skylineNum = taosArrayGetSize(pSkyline);
|
||||||
for (int32_t i = 0; i < skylineNum; ++i) {
|
for (int32_t i = 0; i < skylineNum; ++i) {
|
||||||
TSDBKEY *p = taosArrayGetP(pSkyline, i);
|
TSDBKEY *p = taosArrayGetP(pSkyline, i);
|
||||||
taosArrayPush(aSkyline, p);
|
taosArrayPush(aSkyline, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
_clear:
|
_clear:
|
||||||
|
@ -1394,7 +1392,7 @@ int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t ui
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
|
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
|
||||||
if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
|
if (pBlockData->nRow > 0 && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
|
||||||
return tBlockDataUpdateRow(pBlockData, pRow, pTSchema);
|
return tBlockDataUpdateRow(pBlockData, pRow, pTSchema);
|
||||||
} else {
|
} else {
|
||||||
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
|
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
|
||||||
|
|
|
@ -113,8 +113,6 @@ int vnodeBegin(SVnode *pVnode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
pVnode->state.commitID++;
|
|
||||||
|
|
||||||
// alloc buffer pool
|
// alloc buffer pool
|
||||||
code = vnodeGetBufPoolToUse(pVnode);
|
code = vnodeGetBufPoolToUse(pVnode);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -221,7 +219,7 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
|
int vnodeCommitInfo(const char *dir) {
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
char tfname[TSDB_FILENAME_LEN];
|
char tfname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
@ -233,8 +231,7 @@ int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId);
|
vInfo("vnode info is committed, dir:%s", dir);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,7 +286,7 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
@ -301,7 +298,7 @@ int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
||||||
pInfo->info.config = pVnode->config;
|
pInfo->info.config = pVnode->config;
|
||||||
pInfo->info.state.committed = pVnode->state.applied;
|
pInfo->info.state.committed = pVnode->state.applied;
|
||||||
pInfo->info.state.commitTerm = pVnode->state.applyTerm;
|
pInfo->info.state.commitTerm = pVnode->state.applyTerm;
|
||||||
pInfo->info.state.commitID = pVnode->state.commitID;
|
pInfo->info.state.commitID = ++pVnode->state.commitID;
|
||||||
pInfo->pVnode = pVnode;
|
pInfo->pVnode = pVnode;
|
||||||
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
||||||
|
|
||||||
|
@ -336,7 +333,7 @@ _exit:
|
||||||
vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
|
vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
|
||||||
tstrerror(code), pVnode->state.commitID);
|
tstrerror(code), pVnode->state.commitID);
|
||||||
} else {
|
} else {
|
||||||
vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__);
|
vDebug("vgId:%d, %s done, commit id:%" PRId64, TD_VID(pVnode), __func__, pInfo->info.state.commitID);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -468,7 +465,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// commit info
|
// commit info
|
||||||
if (vnodeCommitInfo(dir, &pInfo->info) < 0) {
|
if (vnodeCommitInfo(dir) < 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,60 +15,86 @@
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
extern int32_t tsdbCompact(STsdb *pTsdb, int32_t flag);
|
static int32_t vnodeCompactTask(void *param) {
|
||||||
|
|
||||||
int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo);
|
|
||||||
|
|
||||||
static int32_t vnodeCompactImpl(SCommitInfo *pInfo) {
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
// TODO
|
SCompactInfo *pInfo = (SCompactInfo *)param;
|
||||||
SVnode *pVnode = pInfo->pVnode;
|
SVnode *pVnode = pInfo->pVnode;
|
||||||
|
|
||||||
code = tsdbCompact(pVnode->pTsdb, 0);
|
// do compact
|
||||||
|
code = tsdbCompact(pInfo->pVnode->pTsdb, pInfo);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
vDebug("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeCompactTask(void *param) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
SCommitInfo *pInfo = (SCommitInfo *)param;
|
|
||||||
|
|
||||||
// compact
|
|
||||||
vnodeCompactImpl(pInfo);
|
|
||||||
|
|
||||||
// end compact
|
// end compact
|
||||||
tsem_post(&pInfo->pVnode->canCommit);
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
if (pVnode->pTfs) {
|
||||||
|
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
||||||
|
} else {
|
||||||
|
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
||||||
|
}
|
||||||
|
vnodeCommitInfo(dir);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
|
tsem_post(&pInfo->pVnode->canCommit);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
tsem_wait(&pVnode->canCommit);
|
||||||
|
|
||||||
|
pInfo->pVnode = pVnode;
|
||||||
|
pInfo->flag = 0;
|
||||||
|
pInfo->commitID = ++pVnode->state.commitID;
|
||||||
|
|
||||||
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
SVnodeInfo info = {0};
|
||||||
|
|
||||||
|
if (pVnode->pTfs) {
|
||||||
|
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
||||||
|
} else {
|
||||||
|
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
||||||
|
}
|
||||||
|
|
||||||
|
vnodeLoadInfo(dir, &info);
|
||||||
|
info.state.commitID = pInfo->commitID;
|
||||||
|
vnodeSaveInfo(dir, &info);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
vError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code),
|
||||||
|
pVnode->state.commitID);
|
||||||
|
} else {
|
||||||
|
vDebug("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pVnode), __func__, pVnode->state.commitID);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t vnodeAsyncCompact(SVnode *pVnode) {
|
int32_t vnodeAsyncCompact(SVnode *pVnode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
// schedule compact task
|
SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo));
|
||||||
SCommitInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo));
|
if (pInfo == NULL) {
|
||||||
if (NULL == pInfo) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodePrepareCommit(pVnode, pInfo);
|
vnodeAsyncCommit(pVnode);
|
||||||
|
|
||||||
|
code = vnodePrepareCompact(pVnode, pInfo);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
vnodeScheduleTask(vnodeCompactTask, pInfo);
|
vnodeScheduleTask(vnodeCompactTask, pInfo);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code));
|
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||||
|
if (pInfo) taosMemoryFree(pInfo);
|
||||||
|
} else {
|
||||||
|
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
info.state.commitID = 0;
|
info.state.commitID = 0;
|
||||||
|
|
||||||
vInfo("vgId:%d, save config while create", pCfg->vgId);
|
vInfo("vgId:%d, save config while create", pCfg->vgId);
|
||||||
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
|
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) {
|
||||||
vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno));
|
vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = vnodeCommitInfo(dir, &info);
|
ret = vnodeCommitInfo(dir);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
|
vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -198,7 +198,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = vnodeCommitInfo(dir, &info);
|
ret = vnodeCommitInfo(dir);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
|
vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -257,7 +257,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
if (updated) {
|
if (updated) {
|
||||||
vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
|
vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
|
||||||
(void)vnodeSaveInfo(dir, &info);
|
(void)vnodeSaveInfo(dir, &info);
|
||||||
(void)vnodeCommitInfo(dir, &info);
|
(void)vnodeCommitInfo(dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
// create handle
|
// create handle
|
||||||
|
|
|
@ -349,7 +349,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
|
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeCommitInfo(dir, &pWriter->info);
|
vnodeCommitInfo(dir);
|
||||||
} else {
|
} else {
|
||||||
vnodeRollback(pWriter->pVnode);
|
vnodeRollback(pWriter->pVnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
add_subdirectory(tdb)
|
add_subdirectory(tdb)
|
||||||
add_subdirectory(cache)
|
|
||||||
add_subdirectory(transport)
|
add_subdirectory(transport)
|
||||||
add_subdirectory(wal)
|
add_subdirectory(wal)
|
||||||
add_subdirectory(monitor)
|
add_subdirectory(monitor)
|
||||||
|
|
|
@ -1,7 +0,0 @@
|
||||||
aux_source_directory(src CACHE_SRC)
|
|
||||||
add_library(cache STATIC ${CACHE_SRC})
|
|
||||||
target_include_directories(
|
|
||||||
cache
|
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/cache"
|
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
|
||||||
)
|
|
|
@ -1,27 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_CACHE_DEF_H_
|
|
||||||
#define _TD_CACHE_DEF_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_CACHE_DEF_H_*/
|
|
|
@ -1,14 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
Loading…
Reference in New Issue