Merge pull request #20222 from taosdata/feat/compact
more compact to enterprise
This commit is contained in:
commit
0fe9b5ca19
|
@ -2,8 +2,9 @@ aux_source_directory(src MNODE_SRC)
|
||||||
IF (TD_PRIVILEGE)
|
IF (TD_PRIVILEGE)
|
||||||
ADD_DEFINITIONS(-D_PRIVILEGE)
|
ADD_DEFINITIONS(-D_PRIVILEGE)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
IF (TD_PRIVILEGE)
|
IF (TD_ENTERPRISE)
|
||||||
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/privilege/src/privilege.c)
|
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/privilege/src/privilege.c)
|
||||||
|
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDb.c)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
add_library(mnode STATIC ${MNODE_SRC})
|
add_library(mnode STATIC ${MNODE_SRC})
|
||||||
|
|
|
@ -33,6 +33,8 @@ bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
|
||||||
SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
||||||
const char *mndGetDbStr(const char *src);
|
const char *mndGetDbStr(const char *src);
|
||||||
|
|
||||||
|
int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -41,12 +41,15 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessUseDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessUseDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
|
|
||||||
static int32_t mndProcessTrimDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessTrimDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
|
static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
|
||||||
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq);
|
static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
|
#ifndef TD_ENTERPRISE
|
||||||
|
int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; }
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t mndInitDb(SMnode *pMnode) {
|
int32_t mndInitDb(SMnode *pMnode) {
|
||||||
SSdbTable table = {
|
SSdbTable table = {
|
||||||
.sdbType = SDB_DB,
|
.sdbType = SDB_DB,
|
||||||
|
@ -1395,98 +1398,6 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCompactDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) {
|
|
||||||
SDbObj dbObj = {0};
|
|
||||||
memcpy(&dbObj, pDb, sizeof(SDbObj));
|
|
||||||
dbObj.compactStartTime = compactTs;
|
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
|
|
||||||
if (pCommitRaw == NULL) return -1;
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
|
||||||
sdbFreeRaw(pCommitRaw);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndSetCompactDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
void *pIter = NULL;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
SVgObj *pVgroup = NULL;
|
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
|
|
||||||
if (mndVgroupInDb(pVgroup, pDb->uid)) {
|
|
||||||
if (mndBuildCompactVgroupAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) {
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
|
|
||||||
int64_t compactTs = taosGetTimestampMs();
|
|
||||||
int32_t code = -1;
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "compact-db");
|
|
||||||
if (pTrans == NULL) goto _OVER;
|
|
||||||
|
|
||||||
mInfo("trans:%d, used to compact db:%s", pTrans->id, pDb->name);
|
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
|
||||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
|
||||||
if (mndSetCompactDbCommitLogs(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER;
|
|
||||||
if (mndSetCompactDbRedoActions(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER;
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
|
||||||
code = 0;
|
|
||||||
|
|
||||||
_OVER:
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
|
|
||||||
SMnode *pMnode = pReq->info.node;
|
|
||||||
int32_t code = -1;
|
|
||||||
SDbObj *pDb = NULL;
|
|
||||||
SCompactDbReq compactReq = {0};
|
|
||||||
|
|
||||||
if (tDeserializeSCompactDbReq(pReq->pCont, pReq->contLen, &compactReq) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("db:%s, start to compact", compactReq.db);
|
|
||||||
|
|
||||||
pDb = mndAcquireDb(pMnode, compactReq.db);
|
|
||||||
if (pDb == NULL) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = mndCompactDb(pMnode, pReq, pDb);
|
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
|
||||||
|
|
||||||
_OVER:
|
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
|
||||||
mError("db:%s, failed to process compact db req since %s", compactReq.db, terrstr());
|
|
||||||
}
|
|
||||||
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
|
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
|
|
|
@ -14,7 +14,6 @@ target_sources(
|
||||||
"src/vnd/vnodeSvr.c"
|
"src/vnd/vnodeSvr.c"
|
||||||
"src/vnd/vnodeSync.c"
|
"src/vnd/vnodeSync.c"
|
||||||
"src/vnd/vnodeSnapshot.c"
|
"src/vnd/vnodeSnapshot.c"
|
||||||
"src/vnd/vnodeCompact.c"
|
|
||||||
"src/vnd/vnodeRetention.c"
|
"src/vnd/vnodeRetention.c"
|
||||||
|
|
||||||
# meta
|
# meta
|
||||||
|
@ -53,7 +52,6 @@ target_sources(
|
||||||
"src/tsdb/tsdbCacheRead.c"
|
"src/tsdb/tsdbCacheRead.c"
|
||||||
"src/tsdb/tsdbRetention.c"
|
"src/tsdb/tsdbRetention.c"
|
||||||
"src/tsdb/tsdbDiskData.c"
|
"src/tsdb/tsdbDiskData.c"
|
||||||
"src/tsdb/tsdbCompact.c"
|
|
||||||
"src/tsdb/tsdbMergeTree.c"
|
"src/tsdb/tsdbMergeTree.c"
|
||||||
"src/tsdb/tsdbDataIter.c"
|
"src/tsdb/tsdbDataIter.c"
|
||||||
|
|
||||||
|
@ -69,10 +67,20 @@ target_sources(
|
||||||
"src/tq/tqSnapshot.c"
|
"src/tq/tqSnapshot.c"
|
||||||
"src/tq/tqOffsetSnapshot.c"
|
"src/tq/tqOffsetSnapshot.c"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
IF (TD_VNODE_PLUGINS)
|
||||||
|
target_sources(
|
||||||
|
vnode
|
||||||
|
PRIVATE
|
||||||
|
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c
|
||||||
|
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c
|
||||||
|
)
|
||||||
|
ENDIF ()
|
||||||
|
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
vnode
|
vnode
|
||||||
PUBLIC "inc"
|
PUBLIC "inc"
|
||||||
PRIVATE "src/inc"
|
PUBLIC "src/inc"
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
||||||
)
|
)
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
|
|
|
@ -106,10 +106,6 @@ int32_t vnodeSyncCommit(SVnode* pVnode);
|
||||||
int32_t vnodeAsyncCommit(SVnode* pVnode);
|
int32_t vnodeAsyncCommit(SVnode* pVnode);
|
||||||
bool vnodeShouldRollback(SVnode* pVnode);
|
bool vnodeShouldRollback(SVnode* pVnode);
|
||||||
|
|
||||||
// vnodeCompact.c
|
|
||||||
int32_t vnodeAsyncCompact(SVnode* pVnode);
|
|
||||||
int32_t vnodeSyncCompact(SVnode* pVnode);
|
|
||||||
|
|
||||||
// vnodeSync.c
|
// vnodeSync.c
|
||||||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||||
int32_t vnodeSyncStart(SVnode* pVnode);
|
int32_t vnodeSyncStart(SVnode* pVnode);
|
||||||
|
|
|
@ -1,664 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "tsdb.h"
|
|
||||||
|
|
||||||
extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo);
|
|
||||||
extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg);
|
|
||||||
extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg);
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
STsdb *pTsdb;
|
|
||||||
int64_t commitID;
|
|
||||||
int8_t cmprAlg;
|
|
||||||
int32_t maxRows;
|
|
||||||
int32_t minRows;
|
|
||||||
|
|
||||||
STsdbFS fs;
|
|
||||||
|
|
||||||
int32_t fid;
|
|
||||||
TABLEID tbid;
|
|
||||||
SSkmInfo tbSkm;
|
|
||||||
|
|
||||||
// Tombstone
|
|
||||||
SDelFReader *pDelFReader;
|
|
||||||
SArray *aDelIdx; // SArray<SDelIdx>
|
|
||||||
SArray *aDelData; // SArray<SDelData>
|
|
||||||
SArray *aSkyLine; // SArray<TSDBKEY>
|
|
||||||
int32_t iDelIdx;
|
|
||||||
int32_t iSkyLine;
|
|
||||||
TSDBKEY *pDKey;
|
|
||||||
TSDBKEY dKey;
|
|
||||||
|
|
||||||
// Reader
|
|
||||||
SDataFReader *pReader;
|
|
||||||
STsdbDataIter2 *iterList; // list of iterators
|
|
||||||
STsdbDataIter2 *pIter;
|
|
||||||
SRBTree rbt;
|
|
||||||
|
|
||||||
// Writer
|
|
||||||
SDataFWriter *pWriter;
|
|
||||||
SArray *aBlockIdx; // SArray<SBlockIdx>
|
|
||||||
SMapData mDataBlk; // SMapData<SDataBlk>
|
|
||||||
SArray *aSttBlk; // SArray<SSttBlk>
|
|
||||||
SBlockData bData;
|
|
||||||
SBlockData sData;
|
|
||||||
} STsdbCompactor;
|
|
||||||
|
|
||||||
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdb *pTsdb = pCompactor->pTsdb;
|
|
||||||
code = tsdbFSRollback(pTsdb);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEID *pId) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
pCompactor->tbid = *pId;
|
|
||||||
|
|
||||||
// tombstone
|
|
||||||
for (;;) {
|
|
||||||
if (pCompactor->iDelIdx >= taosArrayGetSize(pCompactor->aDelIdx)) {
|
|
||||||
pCompactor->pDKey = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(pCompactor->aDelIdx, pCompactor->iDelIdx);
|
|
||||||
int32_t c = tTABLEIDCmprFn(pDelIdx, &pCompactor->tbid);
|
|
||||||
if (c < 0) {
|
|
||||||
pCompactor->iDelIdx++;
|
|
||||||
} else if (c == 0) {
|
|
||||||
pCompactor->iDelIdx++;
|
|
||||||
|
|
||||||
code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1,
|
|
||||||
pCompactor->aSkyLine);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pCompactor->iSkyLine = 0;
|
|
||||||
if (pCompactor->iSkyLine < taosArrayGetSize(pCompactor->aSkyLine)) {
|
|
||||||
TSDBKEY *pKey = (TSDBKEY *)taosArrayGet(pCompactor->aSkyLine, pCompactor->iSkyLine);
|
|
||||||
|
|
||||||
pCompactor->dKey.version = 0;
|
|
||||||
pCompactor->dKey.ts = pKey->ts;
|
|
||||||
pCompactor->pDKey = &pCompactor->dKey;
|
|
||||||
} else {
|
|
||||||
pCompactor->pDKey = NULL;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
pCompactor->pDKey = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// writer
|
|
||||||
code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
tMapDataReset(&pCompactor->mDataBlk);
|
|
||||||
|
|
||||||
code = tBlockDataInit(&pCompactor->bData, pId, pCompactor->tbSkm.pTSchema, NULL, 0);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (!TABLE_SAME_SCHEMA(pCompactor->sData.suid, pCompactor->sData.uid, pId->suid, pId->uid)) {
|
|
||||||
if (pCompactor->sData.nRow > 0) {
|
|
||||||
code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
TABLEID tbid = {.suid = pId->suid, .uid = pId->suid ? 0 : pId->uid};
|
|
||||||
code = tBlockDataInit(&pCompactor->sData, &tbid, pCompactor->tbSkm.pTSchema, NULL, 0);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code));
|
|
||||||
} else {
|
|
||||||
tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pId->suid,
|
|
||||||
pId->uid);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
if (pCompactor->bData.nRow > 0) {
|
|
||||||
if (pCompactor->bData.nRow < pCompactor->minRows) {
|
|
||||||
for (int32_t iRow = 0; iRow < pCompactor->bData.nRow; iRow++) {
|
|
||||||
code = tBlockDataAppendRow(&pCompactor->sData, &tsdbRowFromBlockData(&pCompactor->bData, iRow), NULL,
|
|
||||||
pCompactor->tbid.uid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pCompactor->sData.nRow >= pCompactor->maxRows) {
|
|
||||||
code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tBlockDataClear(&pCompactor->bData);
|
|
||||||
} else {
|
|
||||||
code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCompactor->mDataBlk.nItem > 0) {
|
|
||||||
SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1);
|
|
||||||
if (pBlockIdx == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
pBlockIdx->suid = pCompactor->tbid.suid;
|
|
||||||
pBlockIdx->uid = pCompactor->tbid.uid;
|
|
||||||
|
|
||||||
code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code));
|
|
||||||
} else {
|
|
||||||
tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__,
|
|
||||||
pCompactor->tbid.suid, pCompactor->tbid.uid);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) {
|
|
||||||
TSDBKEY tKey = TSDBROW_KEY(pRow);
|
|
||||||
TSDBKEY *aKey = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine);
|
|
||||||
int32_t nKey = TARRAY_SIZE(pCompactor->aSkyLine);
|
|
||||||
|
|
||||||
if (tKey.ts > pCompactor->pDKey->ts) {
|
|
||||||
do {
|
|
||||||
pCompactor->pDKey->version = aKey[pCompactor->iSkyLine].version;
|
|
||||||
pCompactor->iSkyLine++;
|
|
||||||
if (pCompactor->iSkyLine < nKey) {
|
|
||||||
pCompactor->dKey.ts = aKey[pCompactor->iSkyLine].ts;
|
|
||||||
} else {
|
|
||||||
if (pCompactor->pDKey->version == 0) {
|
|
||||||
pCompactor->pDKey = NULL;
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
pCompactor->pDKey->ts = INT64_MAX;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while (tKey.ts > pCompactor->pDKey->ts);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tKey.ts < pCompactor->pDKey->ts) {
|
|
||||||
if (tKey.version > pCompactor->pDKey->version) {
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
} else if (tKey.ts == pCompactor->pDKey->ts) {
|
|
||||||
ASSERT(pCompactor->iSkyLine < nKey);
|
|
||||||
if (tKey.version > TMAX(pCompactor->pDKey->version, aKey[pCompactor->iSkyLine].version)) {
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *pRowInfo) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
// start a new table data write if need
|
|
||||||
if (pRowInfo == NULL || pRowInfo->uid != pCompactor->tbid.uid) {
|
|
||||||
if (pCompactor->tbid.uid) {
|
|
||||||
code = tsdbCompactWriteTableDataEnd(pCompactor);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRowInfo == NULL) {
|
|
||||||
if (pCompactor->sData.nRow > 0) {
|
|
||||||
code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbCompactWriteTableDataStart(pCompactor, (TABLEID *)pRowInfo);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if row is deleted
|
|
||||||
if (pCompactor->pDKey && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit;
|
|
||||||
|
|
||||||
if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) {
|
|
||||||
code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code));
|
|
||||||
} else if (pRowInfo) {
|
|
||||||
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64,
|
|
||||||
TD_VID(pCompactor->pTsdb->pVnode), __func__, pRowInfo->suid, pRowInfo->uid, TSDBROW_TS(&pRowInfo->row),
|
|
||||||
TSDBROW_VERSION(&pRowInfo->row));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool tsdbCompactTableIsDropped(STsdbCompactor *pCompactor) {
|
|
||||||
SMetaInfo info;
|
|
||||||
|
|
||||||
if (pCompactor->pIter->rowInfo.uid == pCompactor->tbid.uid) return false;
|
|
||||||
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, &info, NULL)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
if (pCompactor->pIter) {
|
|
||||||
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
|
|
||||||
pCompactor->pIter = NULL;
|
|
||||||
} else {
|
|
||||||
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt);
|
|
||||||
if (pNode) {
|
|
||||||
int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode);
|
|
||||||
if (c > 0) {
|
|
||||||
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
|
|
||||||
pCompactor->pIter = NULL;
|
|
||||||
} else if (c == 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCompactor->pIter == NULL) {
|
|
||||||
SRBTreeNode *pNode = tRBTreeDropMin(&pCompactor->rbt);
|
|
||||||
if (pNode) {
|
|
||||||
pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCompactor->pIter) {
|
|
||||||
if (tsdbCompactTableIsDropped(pCompactor)) {
|
|
||||||
TABLEID tbid = {.suid = pCompactor->pIter->rowInfo.suid, .uid = pCompactor->pIter->rowInfo.uid};
|
|
||||||
tRBTreeClear(&pCompactor->rbt);
|
|
||||||
for (pCompactor->pIter = pCompactor->iterList; pCompactor->pIter; pCompactor->pIter = pCompactor->pIter->next) {
|
|
||||||
code = tsdbDataIterNext2(pCompactor->pIter,
|
|
||||||
&(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_TABLEID, .tbid = tbid});
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid) {
|
|
||||||
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
*ppRowInfo = &pCompactor->pIter->rowInfo;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
*ppRowInfo = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pSet) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
pCompactor->fid = pSet->fid;
|
|
||||||
pCompactor->tbid = (TABLEID){0};
|
|
||||||
|
|
||||||
/* tombstone */
|
|
||||||
pCompactor->iDelIdx = 0;
|
|
||||||
|
|
||||||
/* reader */
|
|
||||||
code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbOpenDataFileDataIter(pCompactor->pReader, &pCompactor->pIter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn);
|
|
||||||
if (pCompactor->pIter) {
|
|
||||||
pCompactor->pIter->next = pCompactor->iterList;
|
|
||||||
pCompactor->iterList = pCompactor->pIter;
|
|
||||||
|
|
||||||
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid);
|
|
||||||
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
|
||||||
code = tsdbOpenSttFileDataIter(pCompactor->pReader, iStt, &pCompactor->pIter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pCompactor->pIter) {
|
|
||||||
pCompactor->pIter->next = pCompactor->iterList;
|
|
||||||
pCompactor->iterList = pCompactor->pIter;
|
|
||||||
|
|
||||||
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid);
|
|
||||||
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pCompactor->pIter = NULL;
|
|
||||||
|
|
||||||
/* writer */
|
|
||||||
code = tsdbDataFWriterOpen(&pCompactor->pWriter, pCompactor->pTsdb,
|
|
||||||
&(SDFileSet){.fid = pCompactor->fid,
|
|
||||||
.diskId = pSet->diskId,
|
|
||||||
.pHeadF = &(SHeadFile){.commitID = pCompactor->commitID},
|
|
||||||
.pDataF = &(SDataFile){.commitID = pCompactor->commitID},
|
|
||||||
.pSmaF = &(SSmaFile){.commitID = pCompactor->commitID},
|
|
||||||
.nSttF = 1,
|
|
||||||
.aSttF = {&(SSttFile){.commitID = pCompactor->commitID}}});
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pCompactor->aBlockIdx) {
|
|
||||||
taosArrayClear(pCompactor->aBlockIdx);
|
|
||||||
} else if ((pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
tMapDataReset(&pCompactor->mDataBlk);
|
|
||||||
|
|
||||||
if (pCompactor->aSttBlk) {
|
|
||||||
taosArrayClear(pCompactor->aSttBlk);
|
|
||||||
} else if ((pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
tBlockDataReset(&pCompactor->bData);
|
|
||||||
tBlockDataReset(&pCompactor->sData);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code), pCompactor->fid);
|
|
||||||
} else {
|
|
||||||
tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactFileSetEnd(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
ASSERT(pCompactor->bData.nRow == 0);
|
|
||||||
ASSERT(pCompactor->sData.nRow == 0);
|
|
||||||
|
|
||||||
/* update files */
|
|
||||||
code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbUpdateDFileSetHeader(pCompactor->pWriter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbDataFWriterClose(&pCompactor->pWriter, 1);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbDataFReaderClose(&pCompactor->pReader);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
/* do clear */
|
|
||||||
while ((pCompactor->pIter = pCompactor->iterList) != NULL) {
|
|
||||||
pCompactor->iterList = pCompactor->pIter->next;
|
|
||||||
tsdbCloseDataIter2(pCompactor->pIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
tBlockDataReset(&pCompactor->bData);
|
|
||||||
tBlockDataReset(&pCompactor->sData);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code), pCompactor->fid);
|
|
||||||
} else {
|
|
||||||
tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
// start compact
|
|
||||||
code = tsdbCompactFileSetStart(pCompactor, pSet);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// do compact, end with a NULL row
|
|
||||||
SRowInfo *pRowInfo;
|
|
||||||
do {
|
|
||||||
code = tsdbCompactNextRow(pCompactor, &pRowInfo);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbCompactWriteTableData(pCompactor, pRowInfo);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
} while (pRowInfo);
|
|
||||||
|
|
||||||
// end compact
|
|
||||||
code = tsdbCompactFileSetEnd(pCompactor);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code), pCompactor->fid);
|
|
||||||
if (pCompactor->pWriter) tsdbDataFWriterClose(&pCompactor->pWriter, 0);
|
|
||||||
while ((pCompactor->pIter = pCompactor->iterList)) {
|
|
||||||
pCompactor->iterList = pCompactor->pIter->next;
|
|
||||||
tsdbCloseDataIter2(pCompactor->pIter);
|
|
||||||
}
|
|
||||||
if (pCompactor->pReader) tsdbDataFReaderClose(&pCompactor->pReader);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbEndCompact(STsdbCompactor *pCompactor) {
|
|
||||||
// writer
|
|
||||||
tBlockDataDestroy(&pCompactor->sData);
|
|
||||||
tBlockDataDestroy(&pCompactor->bData);
|
|
||||||
taosArrayDestroy(pCompactor->aSttBlk);
|
|
||||||
tMapDataClear(&pCompactor->mDataBlk);
|
|
||||||
taosArrayDestroy(pCompactor->aBlockIdx);
|
|
||||||
|
|
||||||
// reader
|
|
||||||
|
|
||||||
// tombstone
|
|
||||||
taosArrayDestroy(pCompactor->aSkyLine);
|
|
||||||
taosArrayDestroy(pCompactor->aDelData);
|
|
||||||
taosArrayDestroy(pCompactor->aDelIdx);
|
|
||||||
|
|
||||||
// others
|
|
||||||
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
|
|
||||||
tsdbFSDestroy(&pCompactor->fs);
|
|
||||||
|
|
||||||
tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbBeginCompact(STsdb *pTsdb, SCompactInfo *pInfo, STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
pCompactor->pTsdb = pTsdb;
|
|
||||||
pCompactor->commitID = pInfo->commitID;
|
|
||||||
pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
|
||||||
pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
|
||||||
pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
|
|
||||||
pCompactor->fid = INT32_MIN;
|
|
||||||
|
|
||||||
code = tsdbFSCopy(pTsdb, &pCompactor->fs);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
/* tombstone */
|
|
||||||
if (pCompactor->fs.pDelFile) {
|
|
||||||
code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if ((pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY))) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* reader */
|
|
||||||
|
|
||||||
/* writer */
|
|
||||||
code = tBlockDataCreate(&pCompactor->bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tBlockDataCreate(&pCompactor->sData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code), pCompactor->commitID);
|
|
||||||
tBlockDataDestroy(&pCompactor->sData);
|
|
||||||
tBlockDataDestroy(&pCompactor->bData);
|
|
||||||
if (pCompactor->fs.pDelFile) {
|
|
||||||
taosArrayDestroy(pCompactor->aSkyLine);
|
|
||||||
taosArrayDestroy(pCompactor->aDelData);
|
|
||||||
taosArrayDestroy(pCompactor->aDelIdx);
|
|
||||||
if (pCompactor->pDelFReader) tsdbDelFReaderClose(&pCompactor->pDelFReader);
|
|
||||||
}
|
|
||||||
tsdbFSDestroy(&pCompactor->fs);
|
|
||||||
} else {
|
|
||||||
tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pCompactor->commitID);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbCompact(STsdb *pTsdb, SCompactInfo *pInfo) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
STsdbCompactor *pCompactor = &(STsdbCompactor){0};
|
|
||||||
|
|
||||||
if ((code = tsdbBeginCompact(pTsdb, pInfo, pCompactor))) return code;
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
|
|
||||||
tDFileSetCmprFn, TD_GT);
|
|
||||||
if (pSet == NULL) {
|
|
||||||
pCompactor->fid = INT32_MAX;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((code = tsdbCompactFileSet(pCompactor, pSet))) goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL))) goto _exit;
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbAbortCompact(pCompactor);
|
|
||||||
} else {
|
|
||||||
tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
|
|
||||||
}
|
|
||||||
tsdbEndCompact(pCompactor);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbCommitCompact(STsdb *pTsdb) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
|
||||||
|
|
||||||
code = tsdbFSCommit(pTsdb);
|
|
||||||
if (code) {
|
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
|
@ -1,120 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "vnd.h"
|
|
||||||
|
|
||||||
extern int32_t tsdbCommitCompact(STsdb *pTsdb);
|
|
||||||
|
|
||||||
static int32_t vnodeCompactTask(void *param) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
SCompactInfo *pInfo = (SCompactInfo *)param;
|
|
||||||
SVnode *pVnode = pInfo->pVnode;
|
|
||||||
|
|
||||||
// do compact
|
|
||||||
code = tsdbCompact(pInfo->pVnode->pTsdb, pInfo);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// end compact
|
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
|
||||||
if (pVnode->pTfs) {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeCommitInfo(dir);
|
|
||||||
|
|
||||||
tsdbCommitCompact(pVnode->pTsdb);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
tsem_post(&pInfo->pVnode->canCommit);
|
|
||||||
taosMemoryFree(pInfo);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
tsem_wait(&pVnode->canCommit);
|
|
||||||
|
|
||||||
pInfo->pVnode = pVnode;
|
|
||||||
pInfo->flag = 0;
|
|
||||||
pInfo->commitID = ++pVnode->state.commitID;
|
|
||||||
|
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
|
||||||
SVnodeInfo info = {0};
|
|
||||||
|
|
||||||
if (pVnode->pTfs) {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
|
||||||
} else {
|
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vnodeLoadInfo(dir, &info) < 0) {
|
|
||||||
code = terrno;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
info.state.commitID = pInfo->commitID;
|
|
||||||
|
|
||||||
if (vnodeSaveInfo(dir, &info) < 0) {
|
|
||||||
code = terrno;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
vError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code),
|
|
||||||
pVnode->state.commitID);
|
|
||||||
} else {
|
|
||||||
vDebug("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pVnode), __func__, pVnode->state.commitID);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
int32_t vnodeAsyncCompact(SVnode *pVnode) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo));
|
|
||||||
if (pInfo == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeAsyncCommit(pVnode);
|
|
||||||
|
|
||||||
code = vnodePrepareCompact(pVnode, pInfo);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
vnodeScheduleTask(vnodeCompactTask, pInfo);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
|
||||||
if (pInfo) taosMemoryFree(pInfo);
|
|
||||||
} else {
|
|
||||||
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vnodeSyncCompact(SVnode *pVnode) {
|
|
||||||
vnodeAsyncCompact(pVnode);
|
|
||||||
tsem_wait(&pVnode->canCommit);
|
|
||||||
tsem_post(&pVnode->canCommit);
|
|
||||||
return 0;
|
|
||||||
}
|
|
|
@ -1641,17 +1641,13 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
|
||||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SCompactVnodeReq req = {0};
|
return vnodeProcessCompactVnodeReqImpl(pVnode, version, pReq, len, pRsp);
|
||||||
if (tDeserializeSCompactVnodeReq(pReq, len, &req) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return TSDB_CODE_INVALID_MSG;
|
|
||||||
}
|
}
|
||||||
vInfo("vgId:%d, compact msg will be processed, db:%s dbUid:%" PRId64 " compactStartTime:%" PRId64, TD_VID(pVnode),
|
|
||||||
req.db, req.dbUid, req.compactStartTime);
|
|
||||||
|
|
||||||
vnodeAsyncCompact(pVnode);
|
|
||||||
vnodeBegin(pVnode);
|
|
||||||
|
|
||||||
return 0;
|
#ifndef TD_ENTERPRISE
|
||||||
}
|
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; }
|
||||||
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue