tsdb r/w concurrency
This commit is contained in:
parent
007c5eb0a5
commit
58135a1aeb
|
@ -174,6 +174,10 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
|
||||||
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
||||||
void tsdbMemTableDestroy(SMemTable *pMemTable);
|
void tsdbMemTableDestroy(SMemTable *pMemTable);
|
||||||
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
|
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
|
||||||
|
void tsdbRefMemTable(SMemTable *pMemTable);
|
||||||
|
void tsdbUnrefMemTable(SMemTable *pMemTable);
|
||||||
|
int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem);
|
||||||
|
void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem);
|
||||||
// STbDataIter
|
// STbDataIter
|
||||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
|
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
|
||||||
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||||
|
@ -273,16 +277,14 @@ typedef struct {
|
||||||
} SRtn;
|
} SRtn;
|
||||||
|
|
||||||
struct STsdb {
|
struct STsdb {
|
||||||
char *path;
|
char *path;
|
||||||
SVnode *pVnode;
|
SVnode *pVnode;
|
||||||
TdThreadMutex mutex;
|
TdThreadRwlock rwLock;
|
||||||
bool repoLocked;
|
STsdbKeepCfg keepCfg;
|
||||||
STsdbKeepCfg keepCfg;
|
SMemTable *mem;
|
||||||
SMemTable *mem;
|
SMemTable *imem;
|
||||||
SMemTable *imem;
|
STsdbFS *pFS;
|
||||||
SRtn rtn;
|
SLRUCache *lruCache;
|
||||||
STsdbFS *fs;
|
|
||||||
SLRUCache *lruCache;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STable {
|
struct STable {
|
||||||
|
@ -330,21 +332,19 @@ struct STbData {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SMemTable {
|
struct SMemTable {
|
||||||
SRWLatch latch;
|
SRWLatch latch;
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
int32_t nRef;
|
SVBufPool *pPool;
|
||||||
TSKEY minKey;
|
volatile int32_t nRef;
|
||||||
TSKEY maxKey;
|
TSKEY minKey;
|
||||||
int64_t minVersion;
|
TSKEY maxKey;
|
||||||
int64_t maxVersion;
|
int64_t minVersion;
|
||||||
int64_t nRow;
|
int64_t maxVersion;
|
||||||
int64_t nDel;
|
int64_t nRow;
|
||||||
SArray *aTbData; // SArray<STbData*>
|
int64_t nDel;
|
||||||
|
SArray *aTbData; // SArray<STbData*>
|
||||||
};
|
};
|
||||||
|
|
||||||
int tsdbLockRepo(STsdb *pTsdb);
|
|
||||||
int tsdbUnlockRepo(STsdb *pTsdb);
|
|
||||||
|
|
||||||
struct TSDBROW {
|
struct TSDBROW {
|
||||||
int8_t type; // 0 for row from tsRow, 1 for row from block data
|
int8_t type; // 0 for row from tsRow, 1 for row from block data
|
||||||
union {
|
union {
|
||||||
|
|
|
@ -464,7 +464,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
||||||
|
|
||||||
switch (state->state) {
|
switch (state->state) {
|
||||||
case SFSNEXTROW_FS:
|
case SFSNEXTROW_FS:
|
||||||
state->aDFileSet = state->pTsdb->fs->cState->aDFileSet;
|
state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet;
|
||||||
state->nFileSet = taosArrayGetSize(state->aDFileSet);
|
state->nFileSet = taosArrayGetSize(state->aDFileSet);
|
||||||
state->iFileSet = state->nFileSet;
|
state->iFileSet = state->nFileSet;
|
||||||
|
|
||||||
|
@ -814,7 +814,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
|
|
||||||
SDelIdx delIdx;
|
SDelIdx delIdx;
|
||||||
|
|
||||||
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
|
||||||
if (pDelFile) {
|
if (pDelFile) {
|
||||||
SDelFReader *pDelFReader;
|
SDelFReader *pDelFReader;
|
||||||
|
|
||||||
|
@ -1189,7 +1189,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
|
||||||
|
|
||||||
SDelIdx delIdx;
|
SDelIdx delIdx;
|
||||||
|
|
||||||
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
|
||||||
if (pDelFile) {
|
if (pDelFile) {
|
||||||
SDelFReader *pDelFReader;
|
SDelFReader *pDelFReader;
|
||||||
|
|
||||||
|
@ -1377,7 +1377,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
|
||||||
|
|
||||||
SDelIdx delIdx;
|
SDelIdx delIdx;
|
||||||
|
|
||||||
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
|
||||||
if (pDelFile) {
|
if (pDelFile) {
|
||||||
SDelFReader *pDelFReader;
|
SDelFReader *pDelFReader;
|
||||||
|
|
||||||
|
|
|
@ -64,9 +64,26 @@ int32_t tsdbBegin(STsdb *pTsdb) {
|
||||||
|
|
||||||
if (!pTsdb) return code;
|
if (!pTsdb) return code;
|
||||||
|
|
||||||
code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
|
SMemTable *pMemTable;
|
||||||
|
code = tsdbMemTableCreate(pTsdb, &pMemTable);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// lock
|
||||||
|
code = taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||||
|
if (code) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(code);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTsdb->mem = pMemTable;
|
||||||
|
|
||||||
|
// unlock
|
||||||
|
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
if (code) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(code);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -83,9 +100,11 @@ int32_t tsdbCommit(STsdb *pTsdb) {
|
||||||
|
|
||||||
// check
|
// check
|
||||||
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
|
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
|
||||||
// TODO: lock?
|
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||||
pTsdb->mem = NULL;
|
pTsdb->mem = NULL;
|
||||||
tsdbMemTableDestroy(pMemTable);
|
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
|
||||||
|
tsdbUnrefMemTable(pMemTable);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,7 +158,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile;
|
SDelFile *pDelFileR = pTsdb->pFS->nState->pDelFile;
|
||||||
if (pDelFileR) {
|
if (pDelFileR) {
|
||||||
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
|
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -228,7 +247,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
|
||||||
code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
|
code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel);
|
code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pCommitter->pDelFWriter->fDel);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
|
code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
|
||||||
|
@ -263,7 +282,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
taosArrayClear(pCommitter->aBlockIdx);
|
taosArrayClear(pCommitter->aBlockIdx);
|
||||||
tMapDataReset(&pCommitter->oBlockMap);
|
tMapDataReset(&pCommitter->oBlockMap);
|
||||||
tBlockDataReset(&pCommitter->oBlockData);
|
tBlockDataReset(&pCommitter->oBlockData);
|
||||||
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ);
|
pRSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, pCommitter->commitFid, TD_EQ);
|
||||||
if (pRSet) {
|
if (pRSet) {
|
||||||
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
|
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -836,7 +855,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// upsert SDFileSet
|
// upsert SDFileSet
|
||||||
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
|
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// close and sync
|
// close and sync
|
||||||
|
@ -954,7 +973,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
||||||
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
||||||
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
||||||
|
|
||||||
code = tsdbFSBegin(pTsdb->fs);
|
code = tsdbFSBegin(pTsdb->pFS);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -1135,13 +1154,16 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
||||||
SMemTable *pMemTable = pTsdb->imem;
|
SMemTable *pMemTable = pTsdb->imem;
|
||||||
|
|
||||||
if (eno == 0) {
|
if (eno == 0) {
|
||||||
code = tsdbFSCommit(pTsdb->fs);
|
code = tsdbFSCommit(pTsdb->pFS);
|
||||||
} else {
|
} else {
|
||||||
code = tsdbFSRollback(pTsdb->fs);
|
code = tsdbFSRollback(pTsdb->pFS);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbMemTableDestroy(pMemTable);
|
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||||
pTsdb->imem = NULL;
|
pTsdb->imem = NULL;
|
||||||
|
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
|
||||||
|
tsdbUnrefMemTable(pMemTable);
|
||||||
|
|
||||||
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
|
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -41,6 +41,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
|
||||||
}
|
}
|
||||||
taosInitRWLatch(&pMemTable->latch);
|
taosInitRWLatch(&pMemTable->latch);
|
||||||
pMemTable->pTsdb = pTsdb;
|
pMemTable->pTsdb = pTsdb;
|
||||||
|
pMemTable->pPool = pTsdb->pVnode->inUse;
|
||||||
pMemTable->nRef = 1;
|
pMemTable->nRef = 1;
|
||||||
pMemTable->minKey = TSKEY_MAX;
|
pMemTable->minKey = TSKEY_MAX;
|
||||||
pMemTable->maxKey = TSKEY_MIN;
|
pMemTable->maxKey = TSKEY_MIN;
|
||||||
|
@ -590,3 +591,58 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
|
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
|
||||||
|
|
||||||
|
void tsdbRefMemTable(SMemTable *pMemTable) {
|
||||||
|
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
|
||||||
|
ASSERT(nRef > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbUnrefMemTable(SMemTable *pMemTable) {
|
||||||
|
int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1);
|
||||||
|
if (nRef == 0) {
|
||||||
|
tsdbMemTableDestroy(pMemTable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// lock
|
||||||
|
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
|
||||||
|
if (code) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(code);
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// take snapshot
|
||||||
|
*ppMem = pTsdb->mem;
|
||||||
|
*ppIMem = pTsdb->imem;
|
||||||
|
|
||||||
|
if (*ppMem) {
|
||||||
|
tsdbRefMemTable(*ppMem);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*ppIMem) {
|
||||||
|
tsdbRefMemTable(*ppIMem);
|
||||||
|
}
|
||||||
|
|
||||||
|
// unlock
|
||||||
|
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
if (code) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(code);
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem) {
|
||||||
|
if (pMem) {
|
||||||
|
tsdbUnrefMemTable(pMem);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIMem) {
|
||||||
|
tsdbUnrefMemTable(pIMem);
|
||||||
|
}
|
||||||
|
}
|
|
@ -54,8 +54,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
||||||
sprintf(pTsdb->path, "%s%s%s", pVnode->path, TD_DIRSEP, dir);
|
sprintf(pTsdb->path, "%s%s%s", pVnode->path, TD_DIRSEP, dir);
|
||||||
taosRealPath(pTsdb->path, NULL, slen);
|
taosRealPath(pTsdb->path, NULL, slen);
|
||||||
pTsdb->pVnode = pVnode;
|
pTsdb->pVnode = pVnode;
|
||||||
pTsdb->repoLocked = false;
|
taosThreadRwlockInit(&pTsdb->rwLock, NULL);
|
||||||
taosThreadMutexInit(&pTsdb->mutex, NULL);
|
|
||||||
if (!pKeepCfg) {
|
if (!pKeepCfg) {
|
||||||
tsdbSetKeepCfg(&pTsdb->keepCfg, &pVnode->config.tsdbCfg);
|
tsdbSetKeepCfg(&pTsdb->keepCfg, &pVnode->config.tsdbCfg);
|
||||||
} else {
|
} else {
|
||||||
|
@ -67,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
||||||
tfsMkdir(pVnode->pTfs, pTsdb->path);
|
tfsMkdir(pVnode->pTfs, pTsdb->path);
|
||||||
|
|
||||||
// open tsdb
|
// open tsdb
|
||||||
if (tsdbFSOpen(pTsdb, &pTsdb->fs) < 0) {
|
if (tsdbFSOpen(pTsdb, &pTsdb->pFS) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,33 +87,10 @@ _err:
|
||||||
|
|
||||||
int tsdbClose(STsdb **pTsdb) {
|
int tsdbClose(STsdb **pTsdb) {
|
||||||
if (*pTsdb) {
|
if (*pTsdb) {
|
||||||
taosThreadMutexDestroy(&(*pTsdb)->mutex);
|
taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
|
||||||
tsdbFSClose((*pTsdb)->fs);
|
tsdbFSClose((*pTsdb)->pFS);
|
||||||
tsdbCloseCache((*pTsdb)->lruCache);
|
tsdbCloseCache((*pTsdb)->lruCache);
|
||||||
taosMemoryFreeClear(*pTsdb);
|
taosMemoryFreeClear(*pTsdb);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbLockRepo(STsdb *pTsdb) {
|
|
||||||
int code = taosThreadMutexLock(&pTsdb->mutex);
|
|
||||||
if (code != 0) {
|
|
||||||
tsdbError("vgId:%d, failed to lock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
pTsdb->repoLocked = true;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbUnlockRepo(STsdb *pTsdb) {
|
|
||||||
// ASSERT(IS_REPO_LOCKED(pTsdb));
|
|
||||||
pTsdb->repoLocked = false;
|
|
||||||
int code = taosThreadMutexUnlock(&pTsdb->mutex);
|
|
||||||
if (code != 0) {
|
|
||||||
tsdbError("vgId:%d, failed to unlock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -1905,7 +1905,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
||||||
|
|
||||||
SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
|
SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
|
||||||
|
|
||||||
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
|
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
|
||||||
if (pDelFile) {
|
if (pDelFile) {
|
||||||
SDelFReader* pDelFReader = NULL;
|
SDelFReader* pDelFReader = NULL;
|
||||||
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
|
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
|
||||||
|
@ -2795,7 +2795,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
|
|
||||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||||
|
|
||||||
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
|
STsdbFSState* pFState = pReader->pTsdb->pFS->cState;
|
||||||
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
|
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
|
||||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||||
|
|
||||||
|
@ -3042,7 +3042,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
|
|
||||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||||
|
|
||||||
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
|
STsdbFSState* pFState = pReader->pTsdb->pFS->cState;
|
||||||
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
|
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
|
||||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||||
resetDataBlockScanInfo(pReader->status.pTableMap);
|
resetDataBlockScanInfo(pReader->status.pTableMap);
|
||||||
|
|
|
@ -20,10 +20,10 @@ static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t
|
||||||
STsdbFSState *pState;
|
STsdbFSState *pState;
|
||||||
|
|
||||||
if (try) {
|
if (try) {
|
||||||
pState = pTsdb->fs->cState;
|
pState = pTsdb->pFS->cState;
|
||||||
*canDo = 0;
|
*canDo = 0;
|
||||||
} else {
|
} else {
|
||||||
pState = pTsdb->fs->nState;
|
pState = pTsdb->pFS->nState;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pState->aDFileSet); iSet++) {
|
for (int32_t iSet = 0; iSet < taosArrayGetSize(pState->aDFileSet); iSet++) {
|
||||||
|
@ -83,7 +83,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
||||||
if (!canDo) goto _exit;
|
if (!canDo) goto _exit;
|
||||||
|
|
||||||
// begin
|
// begin
|
||||||
code = tsdbFSBegin(pTsdb->fs);
|
code = tsdbFSBegin(pTsdb->pFS);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// do retention
|
// do retention
|
||||||
|
@ -91,7 +91,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// commit
|
// commit
|
||||||
code = tsdbFSCommit(pTsdb->fs);
|
code = tsdbFSCommit(pTsdb->pFS);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -99,6 +99,6 @@ _exit:
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
tsdbFSRollback(pTsdb->fs);
|
tsdbFSRollback(pTsdb->pFS);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
|
@ -45,7 +45,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (pReader->pDataFReader == NULL) {
|
if (pReader->pDataFReader == NULL) {
|
||||||
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->cState, pReader->fid, TD_GT);
|
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->cState, pReader->fid, TD_GT);
|
||||||
|
|
||||||
if (pSet == NULL) goto _exit;
|
if (pSet == NULL) goto _exit;
|
||||||
|
|
||||||
|
@ -159,7 +159,7 @@ _err:
|
||||||
static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
|
static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdb* pTsdb = pReader->pTsdb;
|
STsdb* pTsdb = pReader->pTsdb;
|
||||||
SDelFile* pDelFile = pTsdb->fs->cState->pDelFile;
|
SDelFile* pDelFile = pTsdb->pFS->cState->pDelFile;
|
||||||
|
|
||||||
if (pReader->pDelFReader == NULL) {
|
if (pReader->pDelFReader == NULL) {
|
||||||
if (pDelFile == NULL) {
|
if (pDelFile == NULL) {
|
||||||
|
@ -798,7 +798,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
|
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter));
|
code = tsdbFSStateUpsertDFileSet(pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter));
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
|
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
|
||||||
|
@ -843,7 +843,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
||||||
pWriter->fid = fid;
|
pWriter->fid = fid;
|
||||||
|
|
||||||
// read
|
// read
|
||||||
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ);
|
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, fid, TD_EQ);
|
||||||
if (pSet) {
|
if (pSet) {
|
||||||
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
|
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -907,7 +907,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
|
||||||
STsdb* pTsdb = pWriter->pTsdb;
|
STsdb* pTsdb = pWriter->pTsdb;
|
||||||
|
|
||||||
if (pWriter->pDelFWriter == NULL) {
|
if (pWriter->pDelFWriter == NULL) {
|
||||||
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->nState);
|
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->nState);
|
||||||
|
|
||||||
// reader
|
// reader
|
||||||
if (pDelFile) {
|
if (pDelFile) {
|
||||||
|
@ -1017,7 +1017,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
|
||||||
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
|
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pWriter->pDelFWriter->fDel);
|
code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pWriter->pDelFWriter->fDel);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
|
code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
|
||||||
|
@ -1096,7 +1096,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbFSBegin(pTsdb->fs);
|
code = tsdbFSBegin(pTsdb->pFS);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
|
@ -1113,7 +1113,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
||||||
STsdbSnapWriter* pWriter = *ppWriter;
|
STsdbSnapWriter* pWriter = *ppWriter;
|
||||||
|
|
||||||
if (rollback) {
|
if (rollback) {
|
||||||
code = tsdbFSRollback(pWriter->pTsdb->fs);
|
code = tsdbFSRollback(pWriter->pTsdb->pFS);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
} else {
|
} else {
|
||||||
code = tsdbSnapWriteDataEnd(pWriter);
|
code = tsdbSnapWriteDataEnd(pWriter);
|
||||||
|
@ -1122,7 +1122,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
||||||
code = tsdbSnapWriteDelEnd(pWriter);
|
code = tsdbSnapWriteDelEnd(pWriter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbFSCommit(pWriter->pTsdb->fs);
|
code = tsdbFSCommit(pWriter->pTsdb->pFS);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue