fix read concurrency

This commit is contained in:
Hongze Cheng 2022-07-21 11:42:42 +00:00
parent bb2e923ffa
commit 684dd82358
10 changed files with 1391 additions and 1021 deletions

View File

@ -62,7 +62,6 @@ typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter; typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS; typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger; typedef struct SRowMerger SRowMerger;
typedef struct STsdbFSState STsdbFSState;
typedef struct STsdbSnapHdr STsdbSnapHdr; typedef struct STsdbSnapHdr STsdbSnapHdr;
typedef struct STsdbReadSnap STsdbReadSnap; typedef struct STsdbReadSnap STsdbReadSnap;
@ -177,8 +176,6 @@ void tsdbMemTableDestroy(SMemTable *pMemTable);
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
void tsdbRefMemTable(SMemTable *pMemTable); void tsdbRefMemTable(SMemTable *pMemTable);
void tsdbUnrefMemTable(SMemTable *pMemTable); void tsdbUnrefMemTable(SMemTable *pMemTable);
int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem);
void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem);
// STbDataIter // STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
void *tsdbTbDataIterDestroy(STbDataIter *pIter); void *tsdbTbDataIterDestroy(STbDataIter *pIter);
@ -208,17 +205,20 @@ void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, ch
// SDelFile // SDelFile
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ============================================================================================== // tsdbFS.c ==============================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); int32_t tsdbFSOpen(STsdb *pTsdb);
int32_t tsdbFSClose(STsdbFS *pFS); int32_t tsdbFSClose(STsdb *pTsdb);
int32_t tsdbFSBegin(STsdbFS *pFS); int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
int32_t tsdbFSCommit(STsdbFS *pFS); void tsdbFSDestroy(STsdbFS *pFS);
int32_t tDFileSetCmprFn(const void *p1, const void *p2);
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS);
int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS);
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS);
int32_t tsdbFSRollback(STsdbFS *pFS); int32_t tsdbFSRollback(STsdbFS *pFS);
int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile); int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet);
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet); int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid);
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState);
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag);
// tsdbReaderWriter.c ============================================================================================== // tsdbReaderWriter.c ==============================================================================================
// SDataFWriter // SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
@ -285,6 +285,11 @@ typedef struct {
TSKEY minKey; TSKEY minKey;
} SRtn; } SRtn;
struct STsdbFS {
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<SDFileSet>
};
struct STsdb { struct STsdb {
char *path; char *path;
SVnode *pVnode; SVnode *pVnode;
@ -292,7 +297,7 @@ struct STsdb {
TdThreadRwlock rwLock; TdThreadRwlock rwLock;
SMemTable *mem; SMemTable *mem;
SMemTable *imem; SMemTable *imem;
STsdbFS *pFS; STsdbFS fs;
SLRUCache *lruCache; SLRUCache *lruCache;
}; };
@ -540,22 +545,6 @@ struct SRowMerger {
SArray *pArray; // SArray<SColVal> SArray *pArray; // SArray<SColVal>
}; };
struct STsdbFSState {
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<aDFileSet>
SDelFile delFile;
};
struct STsdbFS {
STsdb *pTsdb;
STsdbFSState *cState;
STsdbFSState *nState;
// new
SDelFile *pDelFile;
SArray aDFileSetP; // SArray<SDFileSet *>
};
struct SDelFWriter { struct SDelFWriter {
STsdb *pTsdb; STsdb *pTsdb;
SDelFile fDel; SDelFile fDel;

View File

@ -464,7 +464,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
switch (state->state) { switch (state->state) {
case SFSNEXTROW_FS: case SFSNEXTROW_FS:
state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet; // state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet;
state->nFileSet = taosArrayGetSize(state->aDFileSet); state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet; state->iFileSet = state->nFileSet;
@ -793,9 +793,10 @@ typedef struct {
TSDBROW memRow, imemRow, fsRow; TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3]; TsdbNextRowState input[3];
SMemTable *pMemTable; // SMemTable *pMemTable;
SMemTable *pIMemTable; // SMemTable *pIMemTable;
STsdb *pTsdb; STsdbReadSnap *pReadSnap;
STsdb *pTsdb;
} CacheNextRowIter; } CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) { static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) {
@ -803,16 +804,16 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
tb_uid_t suid = getTableSuidByUid(uid, pTsdb); tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
tsdbTakeMemSnapshot(pTsdb, &pIter->pMemTable, &pIter->pIMemTable); tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap);
STbData *pMem = NULL; STbData *pMem = NULL;
if (pIter->pMemTable) { if (pIter->pReadSnap->pMem) {
tsdbGetTbDataFromMemTable(pIter->pMemTable, suid, uid, &pMem); tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid, &pMem);
} }
STbData *pIMem = NULL; STbData *pIMem = NULL;
if (pIter->pIMemTable) { if (pIter->pReadSnap->pIMem) {
tsdbGetTbDataFromMemTable(pIter->pIMemTable, suid, uid, &pIMem); tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid, &pIMem);
} }
pIter->pTsdb = pTsdb; pIter->pTsdb = pTsdb;
@ -821,7 +822,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
SDelIdx delIdx; SDelIdx delIdx;
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); SDelFile *pDelFile = pIter->pReadSnap->fs.pDelFile;
if (pDelFile) { if (pDelFile) {
SDelFReader *pDelFReader; SDelFReader *pDelFReader;
@ -846,6 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb; pIter->fsState.pTsdb = pTsdb;
pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
pIter->fsState.pBlockIdxExp = &pIter->idx; pIter->fsState.pBlockIdxExp = &pIter->idx;
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
@ -885,7 +887,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
taosArrayDestroy(pIter->pSkyline); taosArrayDestroy(pIter->pSkyline);
} }
tsdbUntakeMemSnapshot(pIter->pTsdb, pIter->pMemTable, pIter->pIMemTable); tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap);
return code; return code;
_err: _err:
@ -1172,480 +1174,480 @@ _err:
return code; return code;
} }
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) { // static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
int32_t code = 0; // int32_t code = 0;
SArray *pSkyline = NULL; // SArray *pSkyline = NULL;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); // STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
int16_t nCol = pTSchema->numOfCols; // int16_t nCol = pTSchema->numOfCols;
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); // SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
tb_uid_t suid = getTableSuidByUid(uid, pTsdb); // tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
STbData *pMem = NULL; // STbData *pMem = NULL;
if (pTsdb->mem) { // if (pTsdb->mem) {
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); // tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
} // }
STbData *pIMem = NULL; // STbData *pIMem = NULL;
if (pTsdb->imem) { // if (pTsdb->imem) {
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); // tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
} // }
*ppRow = NULL; // *ppRow = NULL;
pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); // pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelIdx delIdx; // SDelIdx delIdx;
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); // SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
if (pDelFile) { // if (pDelFile) {
SDelFReader *pDelFReader; // SDelFReader *pDelFReader;
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); // code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err; // if (code) goto _err;
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); // code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
if (code) goto _err; // if (code) goto _err;
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline); // code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
if (code) goto _err; // if (code) goto _err;
tsdbDelFReaderClose(&pDelFReader); // tsdbDelFReaderClose(&pDelFReader);
} else { // } else {
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); // code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline);
if (code) goto _err; // if (code) goto _err;
} // }
int64_t iSkyline = taosArrayGetSize(pSkyline) - 1; // int64_t iSkyline = taosArrayGetSize(pSkyline) - 1;
SBlockIdx idx = {.suid = suid, .uid = uid}; // SBlockIdx idx = {.suid = suid, .uid = uid};
SFSNextRowIter fsState = {0}; // SFSNextRowIter fsState = {0};
fsState.state = SFSNEXTROW_FS; // fsState.state = SFSNEXTROW_FS;
fsState.pTsdb = pTsdb; // fsState.pTsdb = pTsdb;
fsState.pBlockIdxExp = &idx; // fsState.pBlockIdxExp = &idx;
SMemNextRowIter memState = {0}; // SMemNextRowIter memState = {0};
SMemNextRowIter imemState = {0}; // SMemNextRowIter imemState = {0};
TSDBROW memRow, imemRow, fsRow; // TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL}, // TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL},
{&imemRow, true, false, &imemState, getNextRowFromMem, NULL}, // {&imemRow, true, false, &imemState, getNextRowFromMem, NULL},
{&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}}; // {&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}};
if (pMem) { // if (pMem) {
memState.pMem = pMem; // memState.pMem = pMem;
memState.state = SMEMNEXTROW_ENTER; // memState.state = SMEMNEXTROW_ENTER;
input[0].stop = false; // input[0].stop = false;
input[0].next = true; // input[0].next = true;
} // }
if (pIMem) { // if (pIMem) {
imemState.pMem = pIMem; // imemState.pMem = pIMem;
imemState.state = SMEMNEXTROW_ENTER; // imemState.state = SMEMNEXTROW_ENTER;
input[1].stop = false; // input[1].stop = false;
input[1].next = true; // input[1].next = true;
} // }
int16_t nilColCount = nCol - 1; // count of null & none cols // int16_t nilColCount = nCol - 1; // count of null & none cols
int iCol = 0; // index of first nil col index from left to right // int iCol = 0; // index of first nil col index from left to right
bool setICol = false; // bool setICol = false;
do { // do {
for (int i = 0; i < 3; ++i) { // for (int i = 0; i < 3; ++i) {
if (input[i].next && !input[i].stop) { // if (input[i].next && !input[i].stop) {
if (input[i].pRow == NULL) { // if (input[i].pRow == NULL) {
code = input[i].nextRowFn(input[i].iter, &input[i].pRow); // code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
if (code) goto _err; // if (code) goto _err;
if (input[i].pRow == NULL) { // if (input[i].pRow == NULL) {
input[i].stop = true; // input[i].stop = true;
input[i].next = false; // input[i].next = false;
} // }
} // }
} // }
} // }
if (input[0].stop && input[1].stop && input[2].stop) { // if (input[0].stop && input[1].stop && input[2].stop) {
break; // break;
} // }
// select maxpoint(s) from mem, imem, fs // // select maxpoint(s) from mem, imem, fs
TSDBROW *max[3] = {0}; // TSDBROW *max[3] = {0};
int iMax[3] = {-1, -1, -1}; // int iMax[3] = {-1, -1, -1};
int nMax = 0; // int nMax = 0;
TSKEY maxKey = TSKEY_MIN; // TSKEY maxKey = TSKEY_MIN;
for (int i = 0; i < 3; ++i) { // for (int i = 0; i < 3; ++i) {
if (!input[i].stop && input[i].pRow != NULL) { // if (!input[i].stop && input[i].pRow != NULL) {
TSDBKEY key = TSDBROW_KEY(input[i].pRow); // TSDBKEY key = TSDBROW_KEY(input[i].pRow);
// merging & deduplicating on client side // // merging & deduplicating on client side
if (maxKey <= key.ts) { // if (maxKey <= key.ts) {
if (maxKey < key.ts) { // if (maxKey < key.ts) {
nMax = 0; // nMax = 0;
maxKey = key.ts; // maxKey = key.ts;
} // }
iMax[nMax] = i; // iMax[nMax] = i;
max[nMax++] = input[i].pRow; // max[nMax++] = input[i].pRow;
} // }
} // }
} // }
// delete detection // // delete detection
TSDBROW *merge[3] = {0}; // TSDBROW *merge[3] = {0};
int iMerge[3] = {-1, -1, -1}; // int iMerge[3] = {-1, -1, -1};
int nMerge = 0; // int nMerge = 0;
for (int i = 0; i < nMax; ++i) { // for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey = TSDBROW_KEY(max[i]); // TSDBKEY maxKey = TSDBROW_KEY(max[i]);
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); // bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
if (!deleted) { // if (!deleted) {
iMerge[nMerge] = i; // iMerge[nMerge] = i;
merge[nMerge++] = max[i]; // merge[nMerge++] = max[i];
} // }
input[iMax[i]].next = deleted; // input[iMax[i]].next = deleted;
} // }
// merge if nMerge > 1 // // merge if nMerge > 1
if (nMerge > 0) { // if (nMerge > 0) {
*dup = false; // *dup = false;
if (nMerge == 1) { // if (nMerge == 1) {
code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow); // code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
if (code) goto _err; // if (code) goto _err;
} else { // } else {
// merge 2 or 3 rows // // merge 2 or 3 rows
SRowMerger merger = {0}; // SRowMerger merger = {0};
tRowMergerInit(&merger, merge[0], pTSchema); // tRowMergerInit(&merger, merge[0], pTSchema);
for (int i = 1; i < nMerge; ++i) { // for (int i = 1; i < nMerge; ++i) {
tRowMerge(&merger, merge[i]); // tRowMerge(&merger, merge[i]);
} // }
tRowMergerGetRow(&merger, ppRow); // tRowMergerGetRow(&merger, ppRow);
tRowMergerClear(&merger); // tRowMergerClear(&merger);
} // }
} // }
} while (1); // } while (1);
for (int i = 0; i < 3; ++i) { // for (int i = 0; i < 3; ++i) {
if (input[i].nextRowClearFn) { // if (input[i].nextRowClearFn) {
input[i].nextRowClearFn(input[i].iter); // input[i].nextRowClearFn(input[i].iter);
} // }
} // }
if (pSkyline) { // if (pSkyline) {
taosArrayDestroy(pSkyline); // taosArrayDestroy(pSkyline);
} // }
taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
return code; // return code;
_err: // _err:
for (int i = 0; i < 3; ++i) { // for (int i = 0; i < 3; ++i) {
if (input[i].nextRowClearFn) { // if (input[i].nextRowClearFn) {
input[i].nextRowClearFn(input[i].iter); // input[i].nextRowClearFn(input[i].iter);
} // }
} // }
if (pSkyline) { // if (pSkyline) {
taosArrayDestroy(pSkyline); // taosArrayDestroy(pSkyline);
} // }
taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); // tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; // return code;
} // }
// static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { // static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
int32_t code = 0; // int32_t code = 0;
SArray *pSkyline = NULL; // SArray *pSkyline = NULL;
STSRow *pRow = NULL; // STSRow *pRow = NULL;
STSRow **ppRow = &pRow; // STSRow **ppRow = &pRow;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); // STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
int16_t nCol = pTSchema->numOfCols; // int16_t nCol = pTSchema->numOfCols;
// SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); // // SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol)); // SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
tb_uid_t suid = getTableSuidByUid(uid, pTsdb); // tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
STbData *pMem = NULL; // STbData *pMem = NULL;
if (pTsdb->mem) { // if (pTsdb->mem) {
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); // tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
} // }
STbData *pIMem = NULL; // STbData *pIMem = NULL;
if (pTsdb->imem) { // if (pTsdb->imem) {
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); // tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
} // }
*ppLastArray = NULL; // *ppLastArray = NULL;
pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); // pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelIdx delIdx; // SDelIdx delIdx;
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); // SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
if (pDelFile) { // if (pDelFile) {
SDelFReader *pDelFReader; // SDelFReader *pDelFReader;
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); // code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err; // if (code) goto _err;
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); // code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
if (code) goto _err; // if (code) goto _err;
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline); // code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
if (code) goto _err; // if (code) goto _err;
tsdbDelFReaderClose(&pDelFReader); // tsdbDelFReaderClose(&pDelFReader);
} else { // } else {
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); // code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline);
if (code) goto _err; // if (code) goto _err;
} // }
int64_t iSkyline = taosArrayGetSize(pSkyline) - 1; // int64_t iSkyline = taosArrayGetSize(pSkyline) - 1;
SBlockIdx idx = {.suid = suid, .uid = uid}; // SBlockIdx idx = {.suid = suid, .uid = uid};
SFSNextRowIter fsState = {0}; // SFSNextRowIter fsState = {0};
fsState.state = SFSNEXTROW_FS; // fsState.state = SFSNEXTROW_FS;
fsState.pTsdb = pTsdb; // fsState.pTsdb = pTsdb;
fsState.pBlockIdxExp = &idx; // fsState.pBlockIdxExp = &idx;
SMemNextRowIter memState = {0}; // SMemNextRowIter memState = {0};
SMemNextRowIter imemState = {0}; // SMemNextRowIter imemState = {0};
TSDBROW memRow, imemRow, fsRow; // TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL}, // TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL},
{&imemRow, true, false, &imemState, getNextRowFromMem, NULL}, // {&imemRow, true, false, &imemState, getNextRowFromMem, NULL},
{&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}}; // {&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}};
if (pMem) { // if (pMem) {
memState.pMem = pMem; // memState.pMem = pMem;
memState.state = SMEMNEXTROW_ENTER; // memState.state = SMEMNEXTROW_ENTER;
input[0].stop = false; // input[0].stop = false;
input[0].next = true; // input[0].next = true;
} // }
if (pIMem) { // if (pIMem) {
imemState.pMem = pIMem; // imemState.pMem = pIMem;
imemState.state = SMEMNEXTROW_ENTER; // imemState.state = SMEMNEXTROW_ENTER;
input[1].stop = false; // input[1].stop = false;
input[1].next = true; // input[1].next = true;
} // }
int16_t nilColCount = nCol - 1; // count of null & none cols // int16_t nilColCount = nCol - 1; // count of null & none cols
int iCol = 0; // index of first nil col index from left to right // int iCol = 0; // index of first nil col index from left to right
bool setICol = false; // bool setICol = false;
do { // do {
for (int i = 0; i < 3; ++i) { // for (int i = 0; i < 3; ++i) {
if (input[i].next && !input[i].stop) { // if (input[i].next && !input[i].stop) {
code = input[i].nextRowFn(input[i].iter, &input[i].pRow); // code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
if (code) goto _err; // if (code) goto _err;
if (input[i].pRow == NULL) { // if (input[i].pRow == NULL) {
input[i].stop = true; // input[i].stop = true;
input[i].next = false; // input[i].next = false;
} // }
} // }
} // }
if (input[0].stop && input[1].stop && input[2].stop) { // if (input[0].stop && input[1].stop && input[2].stop) {
break; // break;
} // }
// select maxpoint(s) from mem, imem, fs // // select maxpoint(s) from mem, imem, fs
TSDBROW *max[3] = {0}; // TSDBROW *max[3] = {0};
int iMax[3] = {-1, -1, -1}; // int iMax[3] = {-1, -1, -1};
int nMax = 0; // int nMax = 0;
TSKEY maxKey = TSKEY_MIN; // TSKEY maxKey = TSKEY_MIN;
for (int i = 0; i < 3; ++i) { // for (int i = 0; i < 3; ++i) {
if (!input[i].stop && input[i].pRow != NULL) { // if (!input[i].stop && input[i].pRow != NULL) {
TSDBKEY key = TSDBROW_KEY(input[i].pRow); // TSDBKEY key = TSDBROW_KEY(input[i].pRow);
// merging & deduplicating on client side // // merging & deduplicating on client side
if (maxKey <= key.ts) { // if (maxKey <= key.ts) {
if (maxKey < key.ts) { // if (maxKey < key.ts) {
nMax = 0; // nMax = 0;
maxKey = key.ts; // maxKey = key.ts;
} // }
iMax[nMax] = i; // iMax[nMax] = i;
max[nMax++] = input[i].pRow; // max[nMax++] = input[i].pRow;
} // }
} // }
} // }
// delete detection // // delete detection
TSDBROW *merge[3] = {0}; // TSDBROW *merge[3] = {0};
int iMerge[3] = {-1, -1, -1}; // int iMerge[3] = {-1, -1, -1};
int nMerge = 0; // int nMerge = 0;
for (int i = 0; i < nMax; ++i) { // for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey = TSDBROW_KEY(max[i]); // TSDBKEY maxKey = TSDBROW_KEY(max[i]);
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); // bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
if (!deleted) { // if (!deleted) {
iMerge[nMerge] = iMax[i]; // iMerge[nMerge] = iMax[i];
merge[nMerge++] = max[i]; // merge[nMerge++] = max[i];
} // }
input[iMax[i]].next = deleted; // input[iMax[i]].next = deleted;
} // }
// merge if nMerge > 1 // // merge if nMerge > 1
if (nMerge > 0) { // if (nMerge > 0) {
if (nMerge == 1) { // if (nMerge == 1) {
code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow); // code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
if (code) goto _err; // if (code) goto _err;
} else { // } else {
// merge 2 or 3 rows // // merge 2 or 3 rows
SRowMerger merger = {0}; // SRowMerger merger = {0};
tRowMergerInit(&merger, merge[0], pTSchema); // tRowMergerInit(&merger, merge[0], pTSchema);
for (int i = 1; i < nMerge; ++i) { // for (int i = 1; i < nMerge; ++i) {
tRowMerge(&merger, merge[i]); // tRowMerge(&merger, merge[i]);
} // }
tRowMergerGetRow(&merger, ppRow); // tRowMergerGetRow(&merger, ppRow);
tRowMergerClear(&merger); // tRowMergerClear(&merger);
} // }
} else { // } else {
/* *ppRow = NULL; */ // /* *ppRow = NULL; */
/* return code; */ // /* return code; */
continue; // continue;
} // }
if (iCol == 0) { // if (iCol == 0) {
STColumn *pTColumn = &pTSchema->columns[0]; // STColumn *pTColumn = &pTSchema->columns[0];
SColVal *pColVal = &(SColVal){0}; // SColVal *pColVal = &(SColVal){0};
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey}); // *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey});
// if (taosArrayPush(pColArray, pColVal) == NULL) { // // if (taosArrayPush(pColArray, pColVal) == NULL) {
if (taosArrayPush(pColArray, &(SLastCol){.ts = maxKey, .colVal = *pColVal}) == NULL) { // if (taosArrayPush(pColArray, &(SLastCol){.ts = maxKey, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; // code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; // goto _err;
} // }
++iCol; // ++iCol;
setICol = false; // setICol = false;
for (int16_t i = iCol; i < nCol; ++i) { // for (int16_t i = iCol; i < nCol; ++i) {
// tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal); // // tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
tTSRowGetVal(*ppRow, pTSchema, i, pColVal); // tTSRowGetVal(*ppRow, pTSchema, i, pColVal);
// if (taosArrayPush(pColArray, pColVal) == NULL) { // // if (taosArrayPush(pColArray, pColVal) == NULL) {
if (taosArrayPush(pColArray, &(SLastCol){.ts = maxKey, .colVal = *pColVal}) == NULL) { // if (taosArrayPush(pColArray, &(SLastCol){.ts = maxKey, .colVal = *pColVal}) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; // code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; // goto _err;
} // }
if (pColVal->isNull || pColVal->isNone) { // if (pColVal->isNull || pColVal->isNone) {
for (int j = 0; j < nMerge; ++j) { // for (int j = 0; j < nMerge; ++j) {
SColVal jColVal = {0}; // SColVal jColVal = {0};
tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); // tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal);
if (jColVal.isNull || jColVal.isNone) { // if (jColVal.isNull || jColVal.isNone) {
input[iMerge[j]].next = true; // input[iMerge[j]].next = true;
} // }
} // }
if (!setICol) { // if (!setICol) {
iCol = i; // iCol = i;
setICol = true; // setICol = true;
} // }
} else { // } else {
--nilColCount; // --nilColCount;
} // }
} // }
if (*ppRow) { // if (*ppRow) {
taosMemoryFreeClear(*ppRow); // taosMemoryFreeClear(*ppRow);
} // }
continue; // continue;
} // }
setICol = false; // setICol = false;
for (int16_t i = iCol; i < nCol; ++i) { // for (int16_t i = iCol; i < nCol; ++i) {
SColVal colVal = {0}; // SColVal colVal = {0};
tTSRowGetVal(*ppRow, pTSchema, i, &colVal); // tTSRowGetVal(*ppRow, pTSchema, i, &colVal);
TSKEY rowTs = (*ppRow)->ts; // TSKEY rowTs = (*ppRow)->ts;
// SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i); // // SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i);
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pColArray, i); // SLastCol *tTsVal = (SLastCol *)taosArrayGet(pColArray, i);
SColVal *tColVal = &tTsVal->colVal; // SColVal *tColVal = &tTsVal->colVal;
if (!colVal.isNone && !colVal.isNull) { // if (!colVal.isNone && !colVal.isNull) {
if (tColVal->isNull || tColVal->isNone) { // if (tColVal->isNull || tColVal->isNone) {
// taosArraySet(pColArray, i, &colVal); // // taosArraySet(pColArray, i, &colVal);
taosArraySet(pColArray, i, &(SLastCol){.ts = rowTs, .colVal = colVal}); // taosArraySet(pColArray, i, &(SLastCol){.ts = rowTs, .colVal = colVal});
--nilColCount; // --nilColCount;
} // }
} else { // } else {
if ((tColVal->isNull || tColVal->isNone) && !setICol) { // if ((tColVal->isNull || tColVal->isNone) && !setICol) {
iCol = i; // iCol = i;
setICol = true; // setICol = true;
for (int j = 0; j < nMerge; ++j) { // for (int j = 0; j < nMerge; ++j) {
SColVal jColVal = {0}; // SColVal jColVal = {0};
tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); // tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal);
if (jColVal.isNull || jColVal.isNone) { // if (jColVal.isNull || jColVal.isNone) {
input[iMerge[j]].next = true; // input[iMerge[j]].next = true;
} // }
} // }
} // }
} // }
} // }
if (*ppRow) { // if (*ppRow) {
taosMemoryFreeClear(*ppRow); // taosMemoryFreeClear(*ppRow);
} // }
} while (nilColCount > 0); // } while (nilColCount > 0);
// if () new ts row from pColArray if non empty // // if () new ts row from pColArray if non empty
/* if (taosArrayGetSize(pColArray) == nCol) { */ // /* if (taosArrayGetSize(pColArray) == nCol) { */
/* code = tdSTSRowNew(pColArray, pTSchema, ppRow); */ // /* code = tdSTSRowNew(pColArray, pTSchema, ppRow); */
/* if (code) goto _err; */ // /* if (code) goto _err; */
/* } */ // /* } */
/* taosArrayDestroy(pColArray); */ // /* taosArrayDestroy(pColArray); */
if (taosArrayGetSize(pColArray) <= 0) { // if (taosArrayGetSize(pColArray) <= 0) {
*ppLastArray = NULL; // *ppLastArray = NULL;
taosArrayDestroy(pColArray); // taosArrayDestroy(pColArray);
} else { // } else {
*ppLastArray = pColArray; // *ppLastArray = pColArray;
} // }
if (*ppRow) { // if (*ppRow) {
taosMemoryFreeClear(*ppRow); // taosMemoryFreeClear(*ppRow);
} // }
for (int i = 0; i < 3; ++i) { // for (int i = 0; i < 3; ++i) {
if (input[i].nextRowClearFn) { // if (input[i].nextRowClearFn) {
input[i].nextRowClearFn(input[i].iter); // input[i].nextRowClearFn(input[i].iter);
} // }
} // }
if (pSkyline) { // if (pSkyline) {
taosArrayDestroy(pSkyline); // taosArrayDestroy(pSkyline);
} // }
taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
return code; // return code;
_err: // _err:
taosArrayDestroy(pColArray); // taosArrayDestroy(pColArray);
if (*ppRow) { // if (*ppRow) {
taosMemoryFreeClear(*ppRow); // taosMemoryFreeClear(*ppRow);
} // }
for (int i = 0; i < 3; ++i) { // for (int i = 0; i < 3; ++i) {
if (input[i].nextRowClearFn) { // if (input[i].nextRowClearFn) {
input[i].nextRowClearFn(input[i].iter); // input[i].nextRowClearFn(input[i].iter);
} // }
} // }
if (pSkyline) { // if (pSkyline) {
taosArrayDestroy(pSkyline); // taosArrayDestroy(pSkyline);
} // }
taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); // tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; // return code;
} // }
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
int32_t code = 0; int32_t code = 0;

View File

@ -29,6 +29,7 @@ typedef struct {
int32_t minRow; int32_t minRow;
int32_t maxRow; int32_t maxRow;
int8_t cmprAlg; int8_t cmprAlg;
STsdbFS fs;
// -------------- // --------------
TSKEY nextKey; // reset by each table commit TSKEY nextKey; // reset by each table commit
int32_t commitFid; int32_t commitFid;
@ -119,9 +120,6 @@ int32_t tsdbCommit(STsdb *pTsdb) {
code = tsdbCommitDel(&commith); code = tsdbCommitDel(&commith);
if (code) goto _err; if (code) goto _err;
code = tsdbCommitCache(&commith);
if (code) goto _err;
// end commit // end commit
code = tsdbEndCommit(&commith, 0); code = tsdbEndCommit(&commith, 0);
if (code) goto _err; if (code) goto _err;
@ -158,7 +156,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
goto _err; goto _err;
} }
SDelFile *pDelFileR = pTsdb->pFS->nState->pDelFile; SDelFile *pDelFileR = pCommitter->fs.pDelFile;
if (pDelFileR) { if (pDelFileR) {
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
if (code) goto _err; if (code) goto _err;
@ -247,7 +245,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter); code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
if (code) goto _err; if (code) goto _err;
code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pCommitter->pDelFWriter->fDel); code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
if (code) goto _err; if (code) goto _err;
code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1); code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
@ -281,7 +279,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear(pCommitter->aBlockIdx); taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap); tMapDataReset(&pCommitter->oBlockMap);
tBlockDataReset(&pCommitter->oBlockData); tBlockDataReset(&pCommitter->oBlockData);
pRSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, pCommitter->commitFid, TD_EQ); pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid},
tDFileSetCmprFn, TD_EQ);
if (pRSet) { if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err; if (code) goto _err;
@ -860,7 +859,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if (code) goto _err; if (code) goto _err;
// upsert SDFileSet // upsert SDFileSet
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->pFS->nState, &pCommitter->pWriter->wSet); code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->pWriter->wSet);
if (code) goto _err; if (code) goto _err;
// close and sync // close and sync
@ -978,7 +977,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
code = tsdbFSBegin(pTsdb->pFS); code = tsdbFSCopy(pTsdb, &pCommitter->fs);
if (code) goto _err; if (code) goto _err;
return code; return code;
@ -1147,28 +1146,33 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitCache(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
int32_t code = 0; int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
if (eno == 0) { ASSERT(eno);
code = tsdbFSCommit(pTsdb->pFS);
} else { code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
code = tsdbFSRollback(pTsdb->pFS); if (code) goto _err;
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
// commit or rollback
code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
} }
taosThreadRwlockWrlock(&pTsdb->rwLock);
pTsdb->imem = NULL; pTsdb->imem = NULL;
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable); tsdbUnrefMemTable(pMemTable);
tsdbFSDestroy(&pCommitter->fs);
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
return code; return code;

File diff suppressed because it is too large Load Diff

View File

@ -122,21 +122,11 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
// truncate // truncate
switch (ftype) { switch (ftype) {
case TSDB_HEAD_FILE:
size = pSet->pHeadF->size;
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
tPutHeadFile(hdr, pSet->pHeadF);
break;
case TSDB_DATA_FILE: case TSDB_DATA_FILE:
size = pSet->pDataF->size; size = pSet->pDataF->size;
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
tPutDataFile(hdr, pSet->pDataF); tPutDataFile(hdr, pSet->pDataF);
break; break;
case TSDB_LAST_FILE:
size = pSet->pLastF->size;
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pLastF, fname);
tPutLastFile(hdr, pSet->pLastF);
break;
case TSDB_SMA_FILE: case TSDB_SMA_FILE:
size = pSet->pSmaF->size; size = pSet->pSmaF->size;
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
@ -186,6 +176,7 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb rollback file failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
@ -219,10 +210,8 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
// SDelFile =============================================== // SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) { void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
STfs *pTfs = pTsdb->pVnode->pTfs; snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTsdb->pVnode->pTfs),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pFile->commitID, ".del");
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, pTsdb->path,
TD_DIRSEP, TD_VID(pTsdb->pVnode), pFile->commitID, ".del");
} }
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) { int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {

View File

@ -605,48 +605,3 @@ void tsdbUnrefMemTable(SMemTable *pMemTable) {
tsdbMemTableDestroy(pMemTable); tsdbMemTableDestroy(pMemTable);
} }
} }
int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem) {
ASSERT(0);
int32_t code = 0;
// lock
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _exit;
}
// take snapshot
*ppMem = pTsdb->mem;
*ppIMem = pTsdb->imem;
if (*ppMem) {
tsdbRefMemTable(*ppMem);
}
if (*ppIMem) {
tsdbRefMemTable(*ppIMem);
}
// unlock
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _exit;
}
_exit:
return code;
}
void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem) {
ASSERT(0);
if (pMem) {
tsdbUnrefMemTable(pMem);
}
if (pIMem) {
tsdbUnrefMemTable(pIMem);
}
}

View File

@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
tfsMkdir(pVnode->pTfs, pTsdb->path); tfsMkdir(pVnode->pTfs, pTsdb->path);
// open tsdb // open tsdb
if (tsdbFSOpen(pTsdb, &pTsdb->pFS) < 0) { if (tsdbFSOpen(pTsdb) < 0) {
goto _err; goto _err;
} }
@ -88,7 +88,7 @@ _err:
int tsdbClose(STsdb **pTsdb) { int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) { if (*pTsdb) {
taosThreadRwlockDestroy(&(*pTsdb)->rwLock); taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
tsdbFSClose((*pTsdb)->pFS); tsdbFSClose(*pTsdb);
tsdbCloseCache((*pTsdb)->lruCache); tsdbCloseCache((*pTsdb)->lruCache);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);
} }

View File

@ -118,8 +118,7 @@ struct STsdbReader {
char* idStr; // query info handle, for debug purpose char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo; SBlockLoadSuppInfo suppInfo;
SMemTable* pMem; STsdbReadSnap* pReadSnap;
SMemTable* pIMem;
SIOCostSummary cost; SIOCostSummary cost;
STSchema* pSchema; STSchema* pSchema;
@ -275,12 +274,12 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
} }
// init file iterator // init file iterator
static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFState, int32_t order, const char* idstr) { static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32_t order, const char* idstr) {
size_t numOfFileset = taosArrayGetSize(pFState->aDFileSet); size_t numOfFileset = taosArrayGetSize(aDFileSet);
pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset; pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
pIter->order = order; pIter->order = order;
pIter->pFileList = taosArrayDup(pFState->aDFileSet); pIter->pFileList = aDFileSet;
pIter->numOfFiles = numOfFileset; pIter->numOfFiles = numOfFileset;
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr); tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
@ -1881,8 +1880,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
STbData* d = NULL; STbData* d = NULL;
if (pReader->pMem != NULL) { if (pReader->pReadSnap->pMem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pMem, pReader->suid, pBlockScanInfo->uid, &d); tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d);
if (d != NULL) { if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
@ -1902,8 +1901,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
} }
STbData* di = NULL; STbData* di = NULL;
if (pReader->pIMem != NULL) { if (pReader->pReadSnap->pIMem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pIMem, pReader->suid, pBlockScanInfo->uid, &di); tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
if (di != NULL) { if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter); code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
@ -1939,7 +1938,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
SArray* pDelData = taosArrayInit(4, sizeof(SDelData)); SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
if (pDelFile) { if (pDelFile) {
SDelFReader* pDelFReader = NULL; SDelFReader* pDelFReader = NULL;
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
@ -2830,8 +2829,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
STsdbFSState* pFState = pReader->pTsdb->pFS->cState; initFilesetIterator(&pReader->status.fileIter, (*ppReader)->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
@ -2844,7 +2842,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
} }
} }
tsdbTakeMemSnapshot(pReader->pTsdb, &pReader->pMem, &pReader->pIMem); code = tsdbTakeReadSnap(pVnode->pTsdb, &pReader->pReadSnap);
if (code) goto _err;
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code; return code;
@ -2861,7 +2860,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
tsdbUntakeMemSnapshot(pReader->pTsdb, pReader->pMem, pReader->pIMem); tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
taosMemoryFreeClear(pSupInfo->plist); taosMemoryFreeClear(pSupInfo->plist);
taosMemoryFree(pSupInfo->colIds); taosMemoryFree(pSupInfo->colIds);
@ -3081,8 +3080,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDataFReaderClose(&pReader->pFileReader); tsdbDataFReaderClose(&pReader->pFileReader);
STsdbFSState* pFState = pReader->pTsdb->pFS->cState; initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
resetDataBlockScanInfo(pReader->status.pTableMap); resetDataBlockScanInfo(pReader->status.pTableMap);
@ -3275,6 +3273,11 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
} }
// fs (todo) // fs (todo)
code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit;
}
// unlock // unlock
code = taosThreadRwlockUnlock(&pTsdb->rwLock); code = taosThreadRwlockUnlock(&pTsdb->rwLock);
@ -3297,6 +3300,6 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
tsdbUnrefMemTable(pSnap->pIMem); tsdbUnrefMemTable(pSnap->pIMem);
} }
// fs (todo) tsdbFSUnref(pTsdb, &pSnap->fs);
} }
} }

View File

@ -16,7 +16,8 @@
#include "tsdb.h" #include "tsdb.h"
static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t *canDo) { static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t *canDo) {
int32_t code = 0; int32_t code = 0;
#if 0
STsdbFSState *pState; STsdbFSState *pState;
if (try) { if (try) {
@ -64,18 +65,20 @@ static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t
code = tsdbDFileSetCopy(pTsdb, pDFileSet, &nDFileSet); code = tsdbDFileSetCopy(pTsdb, pDFileSet, &nDFileSet);
if (code) goto _exit; if (code) goto _exit;
code = tsdbFSStateUpsertDFileSet(pState, &nDFileSet); code = tsdbFSUpsertFSet(pState, &nDFileSet);
if (code) goto _exit; if (code) goto _exit;
} }
} }
} }
#endif
_exit: _exit:
return code; return code;
} }
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
int32_t code = 0; int32_t code = 0;
#if 0
int8_t canDo; int8_t canDo;
// try // try
@ -100,5 +103,6 @@ _exit:
_err: _err:
tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbFSRollback(pTsdb->pFS); tsdbFSRollback(pTsdb->pFS);
#endif
return code; return code;
} }

View File

@ -20,6 +20,7 @@ struct STsdbSnapReader {
STsdb* pTsdb; STsdb* pTsdb;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
STsdbFS fs;
// for data file // for data file
int8_t dataDone; int8_t dataDone;
int32_t fid; int32_t fid;
@ -45,7 +46,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
while (true) { while (true) {
if (pReader->pDataFReader == NULL) { if (pReader->pDataFReader == NULL) {
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->cState, pReader->fid, TD_GT); SDFileSet* pSet =
taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT);
if (pSet == NULL) goto _exit; if (pSet == NULL) goto _exit;
@ -159,7 +161,7 @@ _err:
static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb; STsdb* pTsdb = pReader->pTsdb;
SDelFile* pDelFile = pTsdb->pFS->cState->pDelFile; SDelFile* pDelFile = pReader->fs.pDelFile;
if (pReader->pDelFReader == NULL) { if (pReader->pDelFReader == NULL) {
if (pDelFile == NULL) { if (pDelFile == NULL) {
@ -254,6 +256,24 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
code = tsdbFSRef(pTsdb, &pReader->fs);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
}
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pReader->fid = INT32_MIN; pReader->fid = INT32_MIN;
pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pReader->aBlockIdx == NULL) { if (pReader->aBlockIdx == NULL) {
@ -305,6 +325,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
taosArrayDestroy(pReader->aDelIdx); taosArrayDestroy(pReader->aDelIdx);
taosArrayDestroy(pReader->aDelData); taosArrayDestroy(pReader->aDelData);
tsdbFSUnref(pReader->pTsdb, &pReader->fs);
tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode));
taosMemoryFree(pReader); taosMemoryFree(pReader);
@ -358,6 +380,7 @@ struct STsdbSnapWriter {
STsdb* pTsdb; STsdb* pTsdb;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
STsdbFS fs;
// config // config
int32_t minutes; int32_t minutes;
@ -798,7 +821,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL); code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
if (code) goto _err; if (code) goto _err;
code = tsdbFSStateUpsertDFileSet(pTsdb->pFS->nState, &pWriter->pDataFWriter->wSet); code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
if (code) goto _err; if (code) goto _err;
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
@ -843,7 +866,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
pWriter->fid = fid; pWriter->fid = fid;
// read // read
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, fid, TD_EQ); SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
if (pSet) { if (pSet) {
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err; if (code) goto _err;
@ -911,7 +934,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) { if (pWriter->pDelFWriter == NULL) {
SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->nState); SDelFile* pDelFile = pWriter->fs.pDelFile;
// reader // reader
if (pDelFile) { if (pDelFile) {
@ -1021,7 +1044,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
if (code) goto _err; if (code) goto _err;
code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pWriter->pDelFWriter->fDel); code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel);
if (code) goto _err; if (code) goto _err;
code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1); code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
@ -1055,6 +1078,9 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
code = tsdbFSCopy(pTsdb, &pWriter->fs);
if (code) goto _err;
// config // config
pWriter->minutes = pTsdb->keepCfg.days; pWriter->minutes = pTsdb->keepCfg.days;
pWriter->precision = pTsdb->keepCfg.precision; pWriter->precision = pTsdb->keepCfg.precision;
@ -1100,9 +1126,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
goto _err; goto _err;
} }
code = tsdbFSBegin(pTsdb->pFS);
if (code) goto _err;
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return code;
@ -1117,8 +1140,9 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
STsdbSnapWriter* pWriter = *ppWriter; STsdbSnapWriter* pWriter = *ppWriter;
if (rollback) { if (rollback) {
code = tsdbFSRollback(pWriter->pTsdb->pFS); ASSERT(0);
if (code) goto _err; // code = tsdbFSRollback(pWriter->pTsdb->pFS);
// if (code) goto _err;
} else { } else {
code = tsdbSnapWriteDataEnd(pWriter); code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
@ -1126,7 +1150,10 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code = tsdbSnapWriteDelEnd(pWriter); code = tsdbSnapWriteDelEnd(pWriter);
if (code) goto _err; if (code) goto _err;
code = tsdbFSCommit(pWriter->pTsdb->pFS); code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs);
if (code) goto _err;
code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs);
if (code) goto _err; if (code) goto _err;
} }