feat: retention should not block data commit
This commit is contained in:
parent
bd3db36a40
commit
274d310f33
|
@ -49,7 +49,7 @@ target_sources(
|
||||||
"src/tsdb/tsdbUtil.c"
|
"src/tsdb/tsdbUtil.c"
|
||||||
"src/tsdb/tsdbSnapshot.c"
|
"src/tsdb/tsdbSnapshot.c"
|
||||||
"src/tsdb/tsdbCacheRead.c"
|
"src/tsdb/tsdbCacheRead.c"
|
||||||
"src/tsdb/tsdbRetention.c"
|
"src/tsdb/tsdbRetention2.c"
|
||||||
"src/tsdb/tsdbDiskData.c"
|
"src/tsdb/tsdbDiskData.c"
|
||||||
"src/tsdb/tsdbCompress.c"
|
"src/tsdb/tsdbCompress.c"
|
||||||
"src/tsdb/tsdbCompact.c"
|
"src/tsdb/tsdbCompact.c"
|
||||||
|
|
|
@ -58,6 +58,7 @@ typedef struct STQ STQ;
|
||||||
typedef struct SVState SVState;
|
typedef struct SVState SVState;
|
||||||
typedef struct SVBufPool SVBufPool;
|
typedef struct SVBufPool SVBufPool;
|
||||||
typedef struct SQWorker SQHandle;
|
typedef struct SQWorker SQHandle;
|
||||||
|
typedef struct STrimDbHandle STrimDbHandle;
|
||||||
typedef struct STsdbKeepCfg STsdbKeepCfg;
|
typedef struct STsdbKeepCfg STsdbKeepCfg;
|
||||||
typedef struct SMetaSnapReader SMetaSnapReader;
|
typedef struct SMetaSnapReader SMetaSnapReader;
|
||||||
typedef struct SMetaSnapWriter SMetaSnapWriter;
|
typedef struct SMetaSnapWriter SMetaSnapWriter;
|
||||||
|
@ -301,6 +302,9 @@ struct STsdbKeepCfg {
|
||||||
int32_t keep1;
|
int32_t keep1;
|
||||||
int32_t keep2;
|
int32_t keep2;
|
||||||
};
|
};
|
||||||
|
struct STrimDbHandle {
|
||||||
|
volatile int8_t state; // 0 not in trim, 1 in trim
|
||||||
|
};
|
||||||
|
|
||||||
struct SVnode {
|
struct SVnode {
|
||||||
char* path;
|
char* path;
|
||||||
|
@ -325,6 +329,7 @@ struct SVnode {
|
||||||
bool restored;
|
bool restored;
|
||||||
tsem_t syncSem;
|
tsem_t syncSem;
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
|
STrimDbHandle trimDbH;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
|
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
|
||||||
|
|
|
@ -1045,7 +1045,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
|
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t nRef;
|
int32_t nRef;
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,28 @@
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
|
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
|
||||||
|
if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0);
|
||||||
|
if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
STsdbKeepCfg *keepCfg = &pTsdb->keepCfg;
|
||||||
|
if (keepCfg->keep0 == keepCfg->keep1 && keepCfg->keep1 == keepCfg->keep2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
||||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
||||||
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
|
int32_t expLevel = tsdbFidLevel(pSet->fid, keepCfg, now);
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
|
|
||||||
if (expLevel == pSet->diskId.level) continue;
|
if (expLevel == pSet->diskId.level) continue;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,140 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "tsdb.h"
|
||||||
|
|
||||||
|
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
|
||||||
|
if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0);
|
||||||
|
if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
STsdbKeepCfg *keepCfg = &pTsdb->keepCfg;
|
||||||
|
if (keepCfg->keep0 == keepCfg->keep1 && keepCfg->keep1 == keepCfg->keep2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
||||||
|
pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
||||||
|
int32_t expLevel = tsdbFidLevel(pSet->fid, keepCfg, now);
|
||||||
|
SDiskID did;
|
||||||
|
|
||||||
|
if (expLevel == pSet->diskId.level) continue;
|
||||||
|
|
||||||
|
if (expLevel < 0) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (did.level == pSet->diskId.level) continue;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Data migration between multi-tier storage, including remove expired data.
|
||||||
|
* 1) firstly, remove expired DFileSet;
|
||||||
|
* 2) partition the tsdbFS by the expLevel and fileSize(e.g. 500G, configurable), and migrate DFileSet groups between multi-tier storage;
|
||||||
|
* 3) update the tsdbFS and CURRENT in the same transaction;
|
||||||
|
* 4) finish
|
||||||
|
* @param pTsdb
|
||||||
|
* @param now
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (!tsdbShouldDoRetention(pTsdb, now)) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do retention
|
||||||
|
STsdbFS fs;
|
||||||
|
|
||||||
|
code = tsdbFSCopy(pTsdb, &fs);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
|
||||||
|
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
|
||||||
|
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
|
||||||
|
SDiskID did;
|
||||||
|
|
||||||
|
if (expLevel < 0) {
|
||||||
|
taosMemoryFree(pSet->pHeadF);
|
||||||
|
taosMemoryFree(pSet->pDataF);
|
||||||
|
taosMemoryFree(pSet->aSttF[0]);
|
||||||
|
taosMemoryFree(pSet->pSmaF);
|
||||||
|
taosArrayRemove(fs.aDFileSet, iSet);
|
||||||
|
iSet--;
|
||||||
|
} else {
|
||||||
|
if (expLevel == 0) continue;
|
||||||
|
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
|
||||||
|
code = terrno;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (did.level == pSet->diskId.level) continue;
|
||||||
|
|
||||||
|
// copy file to new disk (todo)
|
||||||
|
SDFileSet fSet = *pSet;
|
||||||
|
fSet.diskId = did;
|
||||||
|
|
||||||
|
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
code = tsdbFSUpsertFSet(&fs, &fSet);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// do change fs
|
||||||
|
code = tsdbFSCommit1(pTsdb, &fs);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||||
|
|
||||||
|
code = tsdbFSCommit2(pTsdb, &fs);
|
||||||
|
if (code) {
|
||||||
|
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
|
||||||
|
tsdbFSDestroy(&fs);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
|
ASSERT(0);
|
||||||
|
// tsdbFSRollback(pTsdb->pFS);
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -186,6 +186,17 @@ void vnodePreClose(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vnodeTrimDbClose(SVnode *pVnode) {
|
||||||
|
int32_t nLoops = 0;
|
||||||
|
while (atomic_load_8(&pVnode->trimDbH.state) != 0) {
|
||||||
|
if (++nLoops > 1000) {
|
||||||
|
vTrace("vgId:%d, wait for trimDb task to finish", TD_VID(pVnode));
|
||||||
|
sched_yield();
|
||||||
|
nLoops = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void vnodeClose(SVnode *pVnode) {
|
void vnodeClose(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
vnodeCommit(pVnode);
|
vnodeCommit(pVnode);
|
||||||
|
@ -197,6 +208,7 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
smaClose(pVnode->pSma);
|
smaClose(pVnode->pSma);
|
||||||
metaClose(pVnode->pMeta);
|
metaClose(pVnode->pMeta);
|
||||||
vnodeCloseBufPool(pVnode);
|
vnodeCloseBufPool(pVnode);
|
||||||
|
vnodeTrimDbClose(pVnode);
|
||||||
// destroy handle
|
// destroy handle
|
||||||
tsem_destroy(&(pVnode->canCommit));
|
tsem_destroy(&(pVnode->canCommit));
|
||||||
tsem_destroy(&pVnode->syncSem);
|
tsem_destroy(&pVnode->syncSem);
|
||||||
|
|
|
@ -378,6 +378,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
||||||
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVTrimDbReq trimReq = {0};
|
SVTrimDbReq trimReq = {0};
|
||||||
|
@ -400,6 +401,85 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SVnode *pVnode;
|
||||||
|
SVTrimDbReq trimReq;
|
||||||
|
} SVndTrimDbReq;
|
||||||
|
|
||||||
|
void *vnodeProcessTrimReqFunc(void *param) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int8_t oldVal = 0;
|
||||||
|
SVndTrimDbReq *pReq = (SVndTrimDbReq *)param;
|
||||||
|
SVnode *pVnode = pReq->pVnode;
|
||||||
|
|
||||||
|
setThreadName("vnode-trim");
|
||||||
|
|
||||||
|
// process
|
||||||
|
code = tsdbDoRetention(pVnode->pTsdb, pReq->trimReq.timestamp);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
code = smaDoRetention(pVnode->pSma, pReq->trimReq.timestamp);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
vInfo("vgId:%d, trim vnode thread finished, time:%d", TD_VID(pVnode), pReq->trimReq.timestamp);
|
||||||
|
oldVal = atomic_val_compare_exchange_8(&pVnode->trimDbH.state, 1, 0);
|
||||||
|
ASSERT(oldVal == 1);
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SVndTrimDbReq *pVndTrimReq = taosMemoryMalloc(sizeof(SVndTrimDbReq));
|
||||||
|
STrimDbHandle *pHandle = &pVnode->trimDbH;
|
||||||
|
|
||||||
|
if (!pVndTrimReq) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVndTrimReq->pVnode = pVnode;
|
||||||
|
|
||||||
|
if (tDeserializeSVTrimDbReq(pReq, len, &pVndTrimReq->trimReq) != 0) {
|
||||||
|
taosMemoryFree(pVndTrimReq);
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) {
|
||||||
|
vInfo("vgId:%d, trim vnode request will not be processed since duplicated req, time:%d", TD_VID(pVnode),
|
||||||
|
pVndTrimReq->trimReq.timestamp);
|
||||||
|
taosMemoryFree(pVndTrimReq);
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
vInfo("vgId:%d, trim vnode request will be processed, time:%d", TD_VID(pVnode), pVndTrimReq->trimReq.timestamp);
|
||||||
|
|
||||||
|
TdThreadAttr thAttr = {0};
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED);
|
||||||
|
|
||||||
|
TdThread tid;
|
||||||
|
if (taosThreadCreate(&tid, &thAttr, vnodeProcessTrimReqFunc, (void *)pVndTrimReq) != 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
taosMemoryFree(pVndTrimReq);
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->state, 1, 0);
|
||||||
|
ASSERT(oldVal == 1);
|
||||||
|
vError("vgId:%d, failed to create pthread for trim vnode since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
vDebug("vgId:%d, success to create pthread for trim vnode", TD_VID(pVnode));
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
|
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
|
||||||
|
@ -921,7 +1001,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(newTbUids) > 0) {
|
if (taosArrayGetSize(newTbUids) > 0) {
|
||||||
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids));
|
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
||||||
|
(int32_t)taosArrayGetSize(newTbUids));
|
||||||
}
|
}
|
||||||
|
|
||||||
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
||||||
|
|
Loading…
Reference in New Issue