diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 75860a4b1e..5106196ccd 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -116,6 +116,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) +#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E) #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 92a20418c5..7ad3cf7b0a 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -89,7 +89,7 @@ else ${build_dir}/bin/tdengine-datasource.zip \ ${build_dir}/bin/tdengine-datasource.zip.md5sum" [ -f ${build_dir}/bin/taosx ] && taosx_bin="${build_dir}/bin/taosx" - explorer_bin_files=$(sh -c "ls ${build_dir}/bin/*-explorer") + explorer_bin_files=$(find ${build_dir}/bin/ -name '*-explorer') bin_files="${build_dir}/bin/${serverName} \ ${build_dir}/bin/${clientName} \ diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index c1b3cde9ea..7aa1c9f56a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -192,8 +192,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp taosWriteQitem(pVnode->pFetchQ, pMsg); break; case WRITE_QUEUE: - if (!osDataSpaceAvailable()) { - terrno = TSDB_CODE_NO_DISKSPACE; + if (!osDataSpaceSufficient()) { + terrno = TSDB_CODE_NO_ENOUGH_DISKSPACE; code = terrno; dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); break; diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 25a4397b7d..493ba48601 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -2,8 +2,9 @@ aux_source_directory(src MNODE_SRC) IF (TD_PRIVILEGE) ADD_DEFINITIONS(-D_PRIVILEGE) ENDIF () -IF (TD_PRIVILEGE) +IF (TD_ENTERPRISE) LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/privilege/src/privilege.c) + LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDb.c) ENDIF () add_library(mnode STATIC ${MNODE_SRC}) diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index 9edfd9bf3b..97d047d7a3 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -33,6 +33,8 @@ bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb); SSdbRaw *mndDbActionEncode(SDbObj *pDb); const char *mndGetDbStr(const char *src); +int32_t mndProcessCompactDbReq(SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 3efd8fb249..5de76a9d46 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -41,12 +41,15 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq); static int32_t mndProcessUseDbReq(SRpcMsg *pReq); -static int32_t mndProcessCompactDbReq(SRpcMsg *pReq); static int32_t mndProcessTrimDbReq(SRpcMsg *pReq); static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq); +#ifndef TD_ENTERPRISE +int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; } +#endif + int32_t mndInitDb(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_DB, @@ -1395,98 +1398,6 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, return 0; } -static int32_t mndSetCompactDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) { - SDbObj dbObj = {0}; - memcpy(&dbObj, pDb, sizeof(SDbObj)); - dbObj.compactStartTime = compactTs; - - SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - sdbFreeRaw(pCommitRaw); - return -1; - } - - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - return 0; -} - -static int32_t mndSetCompactDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - - while (1) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - if (mndVgroupInDb(pVgroup, pDb->uid)) { - if (mndBuildCompactVgroupAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) { - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - return -1; - } - } - - sdbRelease(pSdb, pVgroup); - } - - return 0; -} - -static int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { - int64_t compactTs = taosGetTimestampMs(); - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "compact-db"); - if (pTrans == NULL) goto _OVER; - - mInfo("trans:%d, used to compact db:%s", pTrans->id, pDb->name); - mndTransSetDbName(pTrans, pDb->name, NULL); - if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; - if (mndSetCompactDbCommitLogs(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER; - if (mndSetCompactDbRedoActions(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; - code = 0; - -_OVER: - mndTransDrop(pTrans); - return code; -} - -static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SDbObj *pDb = NULL; - SCompactDbReq compactReq = {0}; - - if (tDeserializeSCompactDbReq(pReq->pCont, pReq->contLen, &compactReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } - - mInfo("db:%s, start to compact", compactReq.db); - - pDb = mndAcquireDb(pMnode, compactReq.db); - if (pDb == NULL) { - goto _OVER; - } - - if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) { - goto _OVER; - } - - code = mndCompactDb(pMnode, pReq, pDb); - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; - -_OVER: - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("db:%s, failed to process compact db req since %s", compactReq.db, terrstr()); - } - - mndReleaseDb(pMnode, pDb); - return code; -} - static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 53a5548b2f..b09a4f63a7 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -344,8 +344,8 @@ static int32_t mndInitWal(SMnode *pMnode) { .fsyncPeriod = 0, .rollPeriod = -1, .segSize = -1, - .retentionPeriod = -1, - .retentionSize = -1, + .retentionPeriod = 0, + .retentionSize = 0, .level = TAOS_WAL_FSYNC, }; @@ -370,7 +370,6 @@ static int32_t mndInitSdb(SMnode *pMnode) { opt.path = pMnode->path; opt.pMnode = pMnode; opt.pWal = pMnode->pWal; - opt.sync = pMnode->syncMgmt.sync; pMnode->pSdb = sdbInit(&opt); if (pMnode->pSdb == NULL) { @@ -552,16 +551,7 @@ void mndPreClose(SMnode *pMnode) { if (pMnode != NULL) { syncLeaderTransfer(pMnode->syncMgmt.sync); syncPreStop(pMnode->syncMgmt.sync); -#if 0 - while (syncSnapshotRecving(pMnode->syncMgmt.sync)) { - mInfo("vgId:1, snapshot is recving"); - taosMsleep(300); - } - while (syncSnapshotSending(pMnode->syncMgmt.sync)) { - mInfo("vgId:1, snapshot is sending"); - taosMsleep(300); - } -#endif + sdbWriteFile(pMnode->pSdb, 0); } } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f618b8afae..edd75c62b9 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -118,12 +118,12 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta transId, pTrans->createdTime, pMgmt->transId); mndTransExecute(pMnode, pTrans, false); mndReleaseTrans(pMnode, pTrans); - // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); } else { mError("trans:%d, not found while execute in mnode since %s", transId, terrstr()); } } + sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); return 0; } @@ -319,6 +319,7 @@ int32_t mndInitSync(SMnode *pMnode) { mError("failed to open sync since %s", terrstr()); return -1; } + pMnode->pSdb->sync = pMgmt->sync; mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); return 0; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index a34dfff4d6..55e9faf020 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1645,8 +1645,6 @@ void mndTransPullup(SMnode *pMnode) { } mndReleaseTrans(pMnode, pTrans); } - - sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); taosArrayDestroy(pArray); } diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 8d2cec478c..1b7b2f9672 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -37,7 +37,7 @@ extern "C" { #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} // clang-format on -#define SDB_WRITE_DELTA 20 +#define SDB_WRITE_DELTA 2000 #define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \ { \ diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index bb8040da07..9797dd8337 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -53,7 +53,6 @@ SSdb *sdbInit(SSdbOpt *pOption) { } pSdb->pWal = pOption->pWal; - pSdb->sync = pOption->sync; pSdb->applyIndex = -1; pSdb->applyTerm = -1; pSdb->applyConfig = -1; diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index c2d27ad713..2e182ec10b 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -472,10 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) { taosThreadMutexLock(&pSdb->filelock); if (pSdb->pWal != NULL) { - // code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex, 0); - if (pSdb->sync == 0) { - code = 0; - } else { + if (pSdb->sync > 0) { code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex); } } @@ -484,11 +481,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) { } if (code == 0) { if (pSdb->pWal != NULL) { - // code = walEndSnapshot(pSdb->pWal); - - if (pSdb->sync == 0) { - code = 0; - } else { + if (pSdb->sync > 0) { code = syncEndSnapshot(pSdb->sync); } } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 8b13d8f02b..8dc3f46ae3 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -14,7 +14,6 @@ target_sources( "src/vnd/vnodeSvr.c" "src/vnd/vnodeSync.c" "src/vnd/vnodeSnapshot.c" - "src/vnd/vnodeCompact.c" "src/vnd/vnodeRetention.c" # meta @@ -53,7 +52,6 @@ target_sources( "src/tsdb/tsdbCacheRead.c" "src/tsdb/tsdbRetention.c" "src/tsdb/tsdbDiskData.c" - "src/tsdb/tsdbCompact.c" "src/tsdb/tsdbMergeTree.c" "src/tsdb/tsdbDataIter.c" @@ -69,10 +67,20 @@ target_sources( "src/tq/tqSnapshot.c" "src/tq/tqOffsetSnapshot.c" ) + +IF (TD_VNODE_PLUGINS) + target_sources( + vnode + PRIVATE + ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c + ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c + ) +ENDIF () + target_include_directories( vnode PUBLIC "inc" - PRIVATE "src/inc" + PUBLIC "src/inc" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" ) target_link_libraries( diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 88cd1d99e1..134909090f 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -106,10 +106,6 @@ int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode); -// vnodeCompact.c -int32_t vnodeAsyncCompact(SVnode* pVnode); -int32_t vnodeSyncCompact(SVnode* pVnode); - // vnodeSync.c int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncStart(SVnode* pVnode); diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c deleted file mode 100644 index 1cd11a3039..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ /dev/null @@ -1,664 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tsdb.h" - -extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo); -extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg); -extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg); - -typedef struct { - STsdb *pTsdb; - int64_t commitID; - int8_t cmprAlg; - int32_t maxRows; - int32_t minRows; - - STsdbFS fs; - - int32_t fid; - TABLEID tbid; - SSkmInfo tbSkm; - - // Tombstone - SDelFReader *pDelFReader; - SArray *aDelIdx; // SArray - SArray *aDelData; // SArray - SArray *aSkyLine; // SArray - int32_t iDelIdx; - int32_t iSkyLine; - TSDBKEY *pDKey; - TSDBKEY dKey; - - // Reader - SDataFReader *pReader; - STsdbDataIter2 *iterList; // list of iterators - STsdbDataIter2 *pIter; - SRBTree rbt; - - // Writer - SDataFWriter *pWriter; - SArray *aBlockIdx; // SArray - SMapData mDataBlk; // SMapData - SArray *aSttBlk; // SArray - SBlockData bData; - SBlockData sData; -} STsdbCompactor; - -static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCompactor->pTsdb; - code = tsdbFSRollback(pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - } - return code; -} - -static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEID *pId) { - int32_t code = 0; - int32_t lino = 0; - - pCompactor->tbid = *pId; - - // tombstone - for (;;) { - if (pCompactor->iDelIdx >= taosArrayGetSize(pCompactor->aDelIdx)) { - pCompactor->pDKey = NULL; - break; - } - - SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(pCompactor->aDelIdx, pCompactor->iDelIdx); - int32_t c = tTABLEIDCmprFn(pDelIdx, &pCompactor->tbid); - if (c < 0) { - pCompactor->iDelIdx++; - } else if (c == 0) { - pCompactor->iDelIdx++; - - code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1, - pCompactor->aSkyLine); - TSDB_CHECK_CODE(code, lino, _exit); - - pCompactor->iSkyLine = 0; - if (pCompactor->iSkyLine < taosArrayGetSize(pCompactor->aSkyLine)) { - TSDBKEY *pKey = (TSDBKEY *)taosArrayGet(pCompactor->aSkyLine, pCompactor->iSkyLine); - - pCompactor->dKey.version = 0; - pCompactor->dKey.ts = pKey->ts; - pCompactor->pDKey = &pCompactor->dKey; - } else { - pCompactor->pDKey = NULL; - } - break; - } else { - pCompactor->pDKey = NULL; - break; - } - } - - // writer - code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm); - TSDB_CHECK_CODE(code, lino, _exit); - - tMapDataReset(&pCompactor->mDataBlk); - - code = tBlockDataInit(&pCompactor->bData, pId, pCompactor->tbSkm.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - if (!TABLE_SAME_SCHEMA(pCompactor->sData.suid, pCompactor->sData.uid, pId->suid, pId->uid)) { - if (pCompactor->sData.nRow > 0) { - code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - - TABLEID tbid = {.suid = pId->suid, .uid = pId->suid ? 0 : pId->uid}; - code = tBlockDataInit(&pCompactor->sData, &tbid, pCompactor->tbSkm.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pId->suid, - pId->uid); - } - return code; -} - -static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - if (pCompactor->bData.nRow > 0) { - if (pCompactor->bData.nRow < pCompactor->minRows) { - for (int32_t iRow = 0; iRow < pCompactor->bData.nRow; iRow++) { - code = tBlockDataAppendRow(&pCompactor->sData, &tsdbRowFromBlockData(&pCompactor->bData, iRow), NULL, - pCompactor->tbid.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCompactor->sData.nRow >= pCompactor->maxRows) { - code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - tBlockDataClear(&pCompactor->bData); - } else { - code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - if (pCompactor->mDataBlk.nItem > 0) { - SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1); - if (pBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pBlockIdx->suid = pCompactor->tbid.suid; - pBlockIdx->uid = pCompactor->tbid.uid; - - code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, - pCompactor->tbid.suid, pCompactor->tbid.uid); - } - return code; -} - -static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) { - TSDBKEY tKey = TSDBROW_KEY(pRow); - TSDBKEY *aKey = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine); - int32_t nKey = TARRAY_SIZE(pCompactor->aSkyLine); - - if (tKey.ts > pCompactor->pDKey->ts) { - do { - pCompactor->pDKey->version = aKey[pCompactor->iSkyLine].version; - pCompactor->iSkyLine++; - if (pCompactor->iSkyLine < nKey) { - pCompactor->dKey.ts = aKey[pCompactor->iSkyLine].ts; - } else { - if (pCompactor->pDKey->version == 0) { - pCompactor->pDKey = NULL; - return false; - } else { - pCompactor->pDKey->ts = INT64_MAX; - } - } - } while (tKey.ts > pCompactor->pDKey->ts); - } - - if (tKey.ts < pCompactor->pDKey->ts) { - if (tKey.version > pCompactor->pDKey->version) { - return false; - } else { - return true; - } - } else if (tKey.ts == pCompactor->pDKey->ts) { - ASSERT(pCompactor->iSkyLine < nKey); - if (tKey.version > TMAX(pCompactor->pDKey->version, aKey[pCompactor->iSkyLine].version)) { - return false; - } else { - return true; - } - } - - return false; -} - -static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *pRowInfo) { - int32_t code = 0; - int32_t lino = 0; - - // start a new table data write if need - if (pRowInfo == NULL || pRowInfo->uid != pCompactor->tbid.uid) { - if (pCompactor->tbid.uid) { - code = tsdbCompactWriteTableDataEnd(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (pRowInfo == NULL) { - if (pCompactor->sData.nRow > 0) { - code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - return code; - } - - code = tsdbCompactWriteTableDataStart(pCompactor, (TABLEID *)pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // check if row is deleted - if (pCompactor->pDKey && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit; - - if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) { - code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } else if (pRowInfo) { - tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, - TD_VID(pCompactor->pTsdb->pVnode), __func__, pRowInfo->suid, pRowInfo->uid, TSDBROW_TS(&pRowInfo->row), - TSDBROW_VERSION(&pRowInfo->row)); - } - return code; -} - -static bool tsdbCompactTableIsDropped(STsdbCompactor *pCompactor) { - SMetaInfo info; - - if (pCompactor->pIter->rowInfo.uid == pCompactor->tbid.uid) return false; - if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, &info, NULL)) { - return true; - } - return false; -} -static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo) { - int32_t code = 0; - int32_t lino = 0; - - for (;;) { - if (pCompactor->pIter) { - code = tsdbDataIterNext2(pCompactor->pIter, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) { - pCompactor->pIter = NULL; - } else { - SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt); - if (pNode) { - int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode); - if (c > 0) { - tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); - pCompactor->pIter = NULL; - } else if (c == 0) { - ASSERT(0); - } - } - } - } - - if (pCompactor->pIter == NULL) { - SRBTreeNode *pNode = tRBTreeDropMin(&pCompactor->rbt); - if (pNode) { - pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode); - } - } - - if (pCompactor->pIter) { - if (tsdbCompactTableIsDropped(pCompactor)) { - TABLEID tbid = {.suid = pCompactor->pIter->rowInfo.suid, .uid = pCompactor->pIter->rowInfo.uid}; - tRBTreeClear(&pCompactor->rbt); - for (pCompactor->pIter = pCompactor->iterList; pCompactor->pIter; pCompactor->pIter = pCompactor->pIter->next) { - code = tsdbDataIterNext2(pCompactor->pIter, - &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_TABLEID, .tbid = tbid}); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid) { - tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); - } - } - } else { - *ppRowInfo = &pCompactor->pIter->rowInfo; - break; - } - } else { - *ppRowInfo = NULL; - break; - } - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pSet) { - int32_t code = 0; - int32_t lino = 0; - - pCompactor->fid = pSet->fid; - pCompactor->tbid = (TABLEID){0}; - - /* tombstone */ - pCompactor->iDelIdx = 0; - - /* reader */ - code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbOpenDataFileDataIter(pCompactor->pReader, &pCompactor->pIter); - TSDB_CHECK_CODE(code, lino, _exit); - - tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn); - if (pCompactor->pIter) { - pCompactor->pIter->next = pCompactor->iterList; - pCompactor->iterList = pCompactor->pIter; - - code = tsdbDataIterNext2(pCompactor->pIter, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - - ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid); - tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); - } - - for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { - code = tsdbOpenSttFileDataIter(pCompactor->pReader, iStt, &pCompactor->pIter); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCompactor->pIter) { - pCompactor->pIter->next = pCompactor->iterList; - pCompactor->iterList = pCompactor->pIter; - - code = tsdbDataIterNext2(pCompactor->pIter, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - - ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid); - tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); - } - } - pCompactor->pIter = NULL; - - /* writer */ - code = tsdbDataFWriterOpen(&pCompactor->pWriter, pCompactor->pTsdb, - &(SDFileSet){.fid = pCompactor->fid, - .diskId = pSet->diskId, - .pHeadF = &(SHeadFile){.commitID = pCompactor->commitID}, - .pDataF = &(SDataFile){.commitID = pCompactor->commitID}, - .pSmaF = &(SSmaFile){.commitID = pCompactor->commitID}, - .nSttF = 1, - .aSttF = {&(SSttFile){.commitID = pCompactor->commitID}}}); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCompactor->aBlockIdx) { - taosArrayClear(pCompactor->aBlockIdx); - } else if ((pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - tMapDataReset(&pCompactor->mDataBlk); - - if (pCompactor->aSttBlk) { - taosArrayClear(pCompactor->aSttBlk); - } else if ((pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - tBlockDataReset(&pCompactor->bData); - tBlockDataReset(&pCompactor->sData); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code), pCompactor->fid); - } else { - tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid); - } - return code; -} - -static int32_t tsdbCompactFileSetEnd(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - ASSERT(pCompactor->bData.nRow == 0); - ASSERT(pCompactor->sData.nRow == 0); - - /* update files */ - code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbUpdateDFileSetHeader(pCompactor->pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDataFWriterClose(&pCompactor->pWriter, 1); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDataFReaderClose(&pCompactor->pReader); - TSDB_CHECK_CODE(code, lino, _exit); - - /* do clear */ - while ((pCompactor->pIter = pCompactor->iterList) != NULL) { - pCompactor->iterList = pCompactor->pIter->next; - tsdbCloseDataIter2(pCompactor->pIter); - } - - tBlockDataReset(&pCompactor->bData); - tBlockDataReset(&pCompactor->sData); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code), pCompactor->fid); - } else { - tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid); - } - return code; -} - -static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) { - int32_t code = 0; - int32_t lino = 0; - - // start compact - code = tsdbCompactFileSetStart(pCompactor, pSet); - TSDB_CHECK_CODE(code, lino, _exit); - - // do compact, end with a NULL row - SRowInfo *pRowInfo; - do { - code = tsdbCompactNextRow(pCompactor, &pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCompactWriteTableData(pCompactor, pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - } while (pRowInfo); - - // end compact - code = tsdbCompactFileSetEnd(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code), pCompactor->fid); - if (pCompactor->pWriter) tsdbDataFWriterClose(&pCompactor->pWriter, 0); - while ((pCompactor->pIter = pCompactor->iterList)) { - pCompactor->iterList = pCompactor->pIter->next; - tsdbCloseDataIter2(pCompactor->pIter); - } - if (pCompactor->pReader) tsdbDataFReaderClose(&pCompactor->pReader); - } - return code; -} - -static void tsdbEndCompact(STsdbCompactor *pCompactor) { - // writer - tBlockDataDestroy(&pCompactor->sData); - tBlockDataDestroy(&pCompactor->bData); - taosArrayDestroy(pCompactor->aSttBlk); - tMapDataClear(&pCompactor->mDataBlk); - taosArrayDestroy(pCompactor->aBlockIdx); - - // reader - - // tombstone - taosArrayDestroy(pCompactor->aSkyLine); - taosArrayDestroy(pCompactor->aDelData); - taosArrayDestroy(pCompactor->aDelIdx); - - // others - tDestroyTSchema(pCompactor->tbSkm.pTSchema); - tsdbFSDestroy(&pCompactor->fs); - - tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID); -} - -static int32_t tsdbBeginCompact(STsdb *pTsdb, SCompactInfo *pInfo, STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - pCompactor->pTsdb = pTsdb; - pCompactor->commitID = pInfo->commitID; - pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; - pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows; - pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows; - pCompactor->fid = INT32_MIN; - - code = tsdbFSCopy(pTsdb, &pCompactor->fs); - TSDB_CHECK_CODE(code, lino, _exit); - - /* tombstone */ - if (pCompactor->fs.pDelFile) { - code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - - if ((pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if ((pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if ((pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - } - - /* reader */ - - /* writer */ - code = tBlockDataCreate(&pCompactor->bData); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataCreate(&pCompactor->sData); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, lino, - tstrerror(code), pCompactor->commitID); - tBlockDataDestroy(&pCompactor->sData); - tBlockDataDestroy(&pCompactor->bData); - if (pCompactor->fs.pDelFile) { - taosArrayDestroy(pCompactor->aSkyLine); - taosArrayDestroy(pCompactor->aDelData); - taosArrayDestroy(pCompactor->aDelIdx); - if (pCompactor->pDelFReader) tsdbDelFReaderClose(&pCompactor->pDelFReader); - } - tsdbFSDestroy(&pCompactor->fs); - } else { - tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pCompactor->commitID); - } - return code; -} - -int32_t tsdbCompact(STsdb *pTsdb, SCompactInfo *pInfo) { - int32_t code = 0; - - STsdbCompactor *pCompactor = &(STsdbCompactor){0}; - - if ((code = tsdbBeginCompact(pTsdb, pInfo, pCompactor))) return code; - - for (;;) { - SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid}, - tDFileSetCmprFn, TD_GT); - if (pSet == NULL) { - pCompactor->fid = INT32_MAX; - break; - } - - if ((code = tsdbCompactFileSet(pCompactor, pSet))) goto _exit; - } - - if ((code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL))) goto _exit; - -_exit: - if (code) { - tsdbAbortCompact(pCompactor); - } else { - tsdbFSPrepareCommit(pTsdb, &pCompactor->fs); - } - tsdbEndCompact(pCompactor); - return code; -} - -int32_t tsdbCommitCompact(STsdb *pTsdb) { - int32_t code = 0; - int32_t lino = 0; - - taosThreadRwlockWrlock(&pTsdb->rwLock); - - code = tsdbFSCommit(pTsdb); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - TSDB_CHECK_CODE(code, lino, _exit); - } - - taosThreadRwlockUnlock(&pTsdb->rwLock); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - } - return code; -} diff --git a/source/dnode/vnode/src/vnd/vnodeCompact.c b/source/dnode/vnode/src/vnd/vnodeCompact.c deleted file mode 100644 index 2b7abee99a..0000000000 --- a/source/dnode/vnode/src/vnd/vnodeCompact.c +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnd.h" - -extern int32_t tsdbCommitCompact(STsdb *pTsdb); - -static int32_t vnodeCompactTask(void *param) { - int32_t code = 0; - int32_t lino = 0; - - SCompactInfo *pInfo = (SCompactInfo *)param; - SVnode *pVnode = pInfo->pVnode; - - // do compact - code = tsdbCompact(pInfo->pVnode->pTsdb, pInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - // end compact - char dir[TSDB_FILENAME_LEN] = {0}; - if (pVnode->pTfs) { - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); - } else { - snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); - } - - vnodeCommitInfo(dir); - - tsdbCommitCompact(pVnode->pTsdb); - -_exit: - tsem_post(&pInfo->pVnode->canCommit); - taosMemoryFree(pInfo); - return code; -} -static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) { - int32_t code = 0; - int32_t lino = 0; - - tsem_wait(&pVnode->canCommit); - - pInfo->pVnode = pVnode; - pInfo->flag = 0; - pInfo->commitID = ++pVnode->state.commitID; - - char dir[TSDB_FILENAME_LEN] = {0}; - SVnodeInfo info = {0}; - - if (pVnode->pTfs) { - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); - } else { - snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); - } - - if (vnodeLoadInfo(dir, &info) < 0) { - code = terrno; - goto _exit; - } - - info.state.commitID = pInfo->commitID; - - if (vnodeSaveInfo(dir, &info) < 0) { - code = terrno; - goto _exit; - } - -_exit: - if (code) { - vError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code), - pVnode->state.commitID); - } else { - vDebug("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pVnode), __func__, pVnode->state.commitID); - } - return code; -} -int32_t vnodeAsyncCompact(SVnode *pVnode) { - int32_t code = 0; - int32_t lino = 0; - - SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo)); - if (pInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - vnodeAsyncCommit(pVnode); - - code = vnodePrepareCompact(pVnode, pInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - vnodeScheduleTask(vnodeCompactTask, pInfo); - -_exit: - if (code) { - vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); - if (pInfo) taosMemoryFree(pInfo); - } else { - vInfo("vgId:%d %s done", TD_VID(pVnode), __func__); - } - return code; -} - -int32_t vnodeSyncCompact(SVnode *pVnode) { - vnodeAsyncCompact(pVnode); - tsem_wait(&pVnode->canCommit); - tsem_post(&pVnode->canCommit); - return 0; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 192a3615e1..3525e696c5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1641,17 +1641,13 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p return TSDB_CODE_SUCCESS; } +extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); + static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { - SCompactVnodeReq req = {0}; - if (tDeserializeSCompactVnodeReq(pReq, len, &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return TSDB_CODE_INVALID_MSG; - } - vInfo("vgId:%d, compact msg will be processed, db:%s dbUid:%" PRId64 " compactStartTime:%" PRId64, TD_VID(pVnode), - req.db, req.dbUid, req.compactStartTime); - - vnodeAsyncCompact(pVnode); - vnodeBegin(pVnode); - - return 0; + return vnodeProcessCompactVnodeReqImpl(pVnode, version, pReq, len, pRsp); } + + +#ifndef TD_ENTERPRISE +int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; } +#endif diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4bb082800d..b22e2e924f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -94,6 +94,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")