more work

This commit is contained in:
Hongze Cheng 2022-06-24 13:10:39 +00:00
parent bbf92eeba6
commit 21062f72d1
6 changed files with 287 additions and 268 deletions

View File

@ -164,8 +164,6 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision); int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision);
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey); void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey);
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline); int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile);
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
@ -181,10 +179,11 @@ bool tsdbTbDataIterNext(STbDataIter *pIter);
typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT; typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT;
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]); void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]);
int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype); int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype);
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile);
int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile);
int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet);
int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet);
// SDelFile // SDelFile
#define tsdbDelFileCreate() \
((SDelFile){ \
.maxKey = TSKEY_MIN, .minKey = TSKEY_MAX, .maxVersion = -1, .minVersion = INT64_MAX, .size = 0, .offset = 0})
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ============================================================================================== // tsdbFS.c ==============================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS); int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS);

View File

@ -816,6 +816,50 @@ _err:
return code; return code;
} }
static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *oBlockIdx) {
int32_t code = 0;
STbDataIter *pIter = &(STbDataIter){0};
TSDBROW *pRow;
// create iter
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow == tsdbTbDataIterGet(pIter);
if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) {
return tsdbCommitDiskData(pCommitter, oBlockIdx);
}
// read
code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL);
if (code) goto _err;
// loop to merge
SBlockData *pBlockData = &pCommitter->nBlockData;
int32_t iBlock = 0;
int32_t nBlock = pCommitter->oBlockMap.nItem;
tBlockDataReset(pBlockData);
while (true) {
if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && iBlock >= nBlock) break;
}
while (iBlock < nBlock) {
/* code */
}
//
while (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) {
/* code */
}
_exit:
if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
return code;
_err:
tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t c; int32_t c;
@ -843,8 +887,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
if (c == 0) { if (c == 0) {
// merge commit // merge commit
// code = tsdbMergeCommit(pCommitter, pTbData, pBlockIdx); code = tsdbMergeMemDisk(pCommitter, pTbData, pBlockIdx);
// if (code) goto _err; if (code) goto _err;
iTbData++; iTbData++;
iBlockIdx++; iBlockIdx++;
@ -1026,14 +1070,14 @@ _exit:
} }
static void tsdbCommitDataEnd(SCommitter *pCommitter) { static void tsdbCommitDataEnd(SCommitter *pCommitter) {
tMapDataClear(&pCommitter->oBlockIdxMap); // tMapDataClear(&pCommitter->oBlockIdxMap);
tMapDataClear(&pCommitter->oBlockMap); // tMapDataClear(&pCommitter->oBlockMap);
tBlockClear(&pCommitter->oBlock); // tBlockClear(&pCommitter->oBlock);
tBlockDataClear(&pCommitter->oBlockData); // tBlockDataClear(&pCommitter->oBlockData);
tMapDataClear(&pCommitter->nBlockIdxMap); // tMapDataClear(&pCommitter->nBlockIdxMap);
tMapDataClear(&pCommitter->nBlockMap); // tMapDataClear(&pCommitter->nBlockMap);
tBlockClear(&pCommitter->nBlock); // tBlockClear(&pCommitter->nBlock);
tBlockDataClear(&pCommitter->nBlockData); // tBlockDataClear(&pCommitter->nBlockData);
} }
static int32_t tsdbCommitData(SCommitter *pCommitter) { static int32_t tsdbCommitData(SCommitter *pCommitter) {
@ -1115,6 +1159,22 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter) {
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
int32_t code = 0; int32_t code = 0;
// TODO STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
if (eno == 0) {
code = tsdbFSCommit(pTsdb->fs);
} else {
code = tsdbFSRollback(pTsdb->fs);
}
tsdbMemTableDestroy(pMemTable);
pTsdb->imem = NULL;
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
return code;
_err:
tsdbError("vgId:%d tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }

View File

@ -16,158 +16,69 @@
#include "tsdb.h" #include "tsdb.h"
// ================================================================================================= // =================================================================================================
static int32_t tsdbDelFileToJson(const void *pObj, SJson *pJson) { static int32_t tPutFSState(uint8_t *p, STsdbFSState *pState) {
int32_t code = 0; int32_t n = 0;
SDelFile *pDelFile = (SDelFile *)pObj; int8_t hasDel = pState->pDelFile ? 1 : 0;
uint32_t nDFileSet = taosArrayGetSize(pState->aDFileSet);
if (tjsonAddIntegerToObject(pJson, "minKey", pDelFile->minKey) < 0) goto _err; // SDelFile
if (tjsonAddIntegerToObject(pJson, "maxKey", pDelFile->maxKey) < 0) goto _err; n += tPutI8(p ? p + n : p, hasDel);
if (tjsonAddIntegerToObject(pJson, "minVer", pDelFile->minVersion) < 0) goto _err; if (hasDel) {
if (tjsonAddIntegerToObject(pJson, "maxVer", pDelFile->maxVersion) < 0) goto _err; n += tPutDelFile(p ? p + n : p, pState->pDelFile);
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
static int32_t tsdbHeadFileToJson(const void *pObj, SJson *pJson) { // SArray<SDFileSet>
int32_t code = 0; n += tPutU32v(p ? p + n : p, nDFileSet);
SHeadFile *pHeadFile = (SHeadFile *)pObj; for (uint32_t iDFileSet = 0; iDFileSet < nDFileSet; iDFileSet++) {
n += tPutDFileSet(p ? p + n : p, (SDFileSet *)taosArrayGet(pState->aDFileSet, iDFileSet));
if (tjsonAddIntegerToObject(pJson, "size", pHeadFile->size) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "offset", pHeadFile->offset) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
static int32_t tsdbDataFileToJson(const void *pObj, SJson *pJson) { return n;
int32_t code = 0;
SDataFile *pDataFile = (SDataFile *)pObj;
if (tjsonAddIntegerToObject(pJson, "size", pDataFile->size) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
static int32_t tsdbLastFileToJson(const void *pObj, SJson *pJson) { static int32_t tGetFSState(uint8_t *p, STsdbFSState *pState) {
int32_t code = 0; int32_t n = 0;
SLastFile *pLastFile = (SLastFile *)pObj; int8_t hasDel;
uint32_t nDFileSet;
SDFileSet *pSet = &(SDFileSet){0};
if (tjsonAddIntegerToObject(pJson, "size", pLastFile->size) < 0) goto _err; // SDelFile
n += tGetI8(p + n, &hasDel);
return code; if (hasDel) {
pState->pDelFile = &pState->delFile;
_err: n += tGetDelFile(p + n, pState->pDelFile);
code = TSDB_CODE_OUT_OF_MEMORY; } else {
return code; pState->pDelFile = NULL;
} }
static int32_t tsdbSmaFileToJson(const void *pObj, SJson *pJson) { // SArray<SDFileSet>
int32_t code = 0; taosArrayClear(pState->aDFileSet);
SSmaFile *pSmaFile = (SSmaFile *)pObj; n += tGetU32v(p + n, &nDFileSet);
for (uint32_t iDFileSet = 0; iDFileSet < nDFileSet; iDFileSet++) {
if (tjsonAddIntegerToObject(pJson, "size", pSmaFile->size) < 0) goto _err; n += tGetDFileSet(p + n, pSet);
taosArrayPush(pState->aDFileSet, pSet);
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
static int32_t tsdbDFileSetToJson(const void *pObj, SJson *pJson) { return n;
int32_t code = 0;
SDFileSet *pDFileSet = (SDFileSet *)pObj;
if (tjsonAddIntegerToObject(pJson, "level", pDFileSet->diskId.level) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "id", pDFileSet->diskId.id) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "fid", pDFileSet->fid) < 0) goto _err;
// if (tjsonAddObject(pJson, "head", tsdbHeadFileToJson, pDFileSet->pHeadFile) < 0) goto _err;
// if (tjsonAddObject(pJson, "data", tsdbDataFileToJson, pDFileSet->pDataFile) < 0) goto _err;
// if (tjsonAddObject(pJson, "last", tsdbLastFileToJson, pDFileSet->pLastFile) < 0) goto _err;
// if (tjsonAddObject(pJson, "sma", tsdbSmaFileToJson, pDFileSet->pSmaFile) < 0) goto _err;
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
static int32_t tsdbFSStateToJsonStr(STsdbFSState *pState, char **ppData) { static int32_t tsdbGnrtCurrent(const char *fname, STsdbFSState *pState) {
int32_t code = 0;
SJson *pJson = NULL;
pJson = tjsonCreateObject();
if (pJson == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (tjsonAddObject(pJson, "DelFile", tsdbDelFileToJson, pState->pDelFile) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (tjsonAddTArray(pJson, "DFileSet", tsdbDFileSetToJson, pState->aDFileSet) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*ppData = tjsonToString(pJson);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tjsonDelete(pJson);
return code;
_err:
return code;
}
static int32_t tsdbJsonStrToFSState(char *pData, STsdbFSState *pState) {
int32_t code = 0;
SJson *pJson = NULL;
pJson = tjsonParse(pData);
if (pJson == NULL) goto _err;
// if (tjsonToObject(pJson, "DelFile", tsdbJsonToDelFile, &pState->pDelFile) < 0) goto _err;
// if (tjsonToTArray(pJson, "DFIleSet", tsdbJsonToDFileSet, ) < 0) goto _err;
ASSERT(0);
tjsonDelete(pJson);
return code;
_err:
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t tsdbCreateEmptyCurrent(const char *fname, STsdbFSState *pState) {
int32_t code = 0; int32_t code = 0;
int64_t n; int64_t n;
int64_t size; int64_t size;
char *pData = NULL; uint8_t *pData;
TdFilePtr pFD = NULL; TdFilePtr pFD = NULL;
// to json str // to binary
code = tsdbFSStateToJsonStr(pState, &pData); size = tPutFSState(NULL, pState) + sizeof(TSCKSUM);
if (code) goto _err; pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
n = tPutFSState(pData, pState);
ASSERT(n + sizeof(TSCKSUM) == size);
taosCalcChecksumAppend(0, pData, size);
// create and write // create and write
pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE); pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE);
@ -176,7 +87,6 @@ static int32_t tsdbCreateEmptyCurrent(const char *fname, STsdbFSState *pState) {
goto _err; goto _err;
} }
size = strlen(pData);
n = taosWriteFile(pFD, pData, size); n = taosWriteFile(pFD, pData, size);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
@ -194,60 +104,7 @@ static int32_t tsdbCreateEmptyCurrent(const char *fname, STsdbFSState *pState) {
return code; return code;
_err: _err:
tsdbError("create empry current failed since %s", tstrerror(code)); tsdbError("tsdb gnrt current failed since %s", tstrerror(code));
if (pData) taosMemoryFree(pData);
return code;
}
static int32_t tsdbSaveCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
int32_t code = 0;
int64_t n;
int64_t size;
char tfname[TSDB_FILENAME_LEN];
char fname[TSDB_FILENAME_LEN];
char *pData = NULL;
TdFilePtr pFD = NULL;
snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s/CURRENT.t", pFS->pTsdb->path);
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/CURRENT", pFS->pTsdb->path);
// encode
code = tsdbFSStateToJsonStr(pState, &pData);
if (code) goto _err;
// create and write tfname
pFD = taosOpenFile(tfname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
size = strlen(pData);
n = taosWriteFile(pFD, pData, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFD);
// rename
code = taosRenameFile(tfname, fname);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
if (pData) taosMemoryFree(pData);
return code;
_err:
tsdbError("vgId:%d tsdb save current state failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
if (pData) taosMemoryFree(pData); if (pData) taosMemoryFree(pData);
return code; return code;
} }
@ -257,7 +114,7 @@ static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
int64_t size; int64_t size;
int64_t n; int64_t n;
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
char *pData = NULL; uint8_t *pData = NULL;
TdFilePtr pFD; TdFilePtr pFD;
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pFS->pTsdb->pVnode->pTfs), TD_DIRSEP, snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pFS->pTsdb->pVnode->pTfs), TD_DIRSEP,
@ -265,7 +122,7 @@ static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
if (!taosCheckExistFile(fname)) { if (!taosCheckExistFile(fname)) {
// create an empry CURRENT file if not exists // create an empry CURRENT file if not exists
code = tsdbCreateEmptyCurrent(fname, pState); code = tsdbGnrtCurrent(fname, pState);
if (code) goto _err; if (code) goto _err;
} else { } else {
// open the file and load // open the file and load
@ -280,12 +137,11 @@ static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
goto _err; goto _err;
} }
pData = taosMemoryMalloc(size + 1); pData = taosMemoryMalloc(size);
if (pData == NULL) { if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pData[size] = '\0';
n = taosReadFile(pFD, pData, size); n = taosReadFile(pFD, pData, size);
if (n < 0) { if (n < 0) {
@ -293,11 +149,15 @@ static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
goto _err; goto _err;
} }
if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
taosCloseFile(&pFD); taosCloseFile(&pFD);
// decode // decode
code = tsdbJsonStrToFSState(pData, pState); tGetFSState(pData, pState);
if (code) goto _err;
} }
if (pData) taosMemoryFree(pData); if (pData) taosMemoryFree(pData);
@ -679,15 +539,29 @@ _err:
int32_t tsdbFSCommit(STsdbFS *pFS) { int32_t tsdbFSCommit(STsdbFS *pFS) {
int32_t code = 0; int32_t code = 0;
STsdbFSState *pState = pFS->nState; STsdbFSState *pState = pFS->nState;
char tfname[TSDB_FILENAME_LEN];
char fname[TSDB_FILENAME_LEN];
// need lock (todo) // need lock (todo)
pFS->nState = pFS->cState; pFS->nState = pFS->cState;
pFS->cState = pState; pFS->cState = pState;
// save snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pFS->pTsdb->pVnode->pTfs), TD_DIRSEP,
code = tsdbSaveCurrentState(pFS, pFS->cState); pFS->pTsdb->path, TD_DIRSEP);
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pFS->pTsdb->pVnode->pTfs), TD_DIRSEP,
pFS->pTsdb->path, TD_DIRSEP);
// gnrt CURRENT.t
code = tsdbGnrtCurrent(tfname, pFS->cState);
if (code) goto _err; if (code) goto _err;
// rename
code = taosRenameFile(tfname, fname);
if (code) {
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
// apply commit on disk // apply commit on disk
code = tsdbFSApplyDiskChange(pFS, pFS->nState, pFS->cState); code = tsdbFSApplyDiskChange(pFS, pFS->nState, pFS->cState);
if (code) goto _err; if (code) goto _err;

View File

@ -15,6 +15,81 @@
#include "tsdb.h" #include "tsdb.h"
static int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
int32_t n = 0;
n += tPutI64v(p ? p + n : p, pHeadFile->commitID);
n += tPutI64v(p ? p + n : p, pHeadFile->size);
n += tPutI64v(p ? p + n : p, pHeadFile->offset);
return n;
}
static int32_t tGetHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
int32_t n = 0;
n += tGetI64v(p + n, &pHeadFile->commitID);
n += tGetI64v(p + n, &pHeadFile->size);
n += tGetI64v(p + n, &pHeadFile->offset);
return n;
}
static int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile) {
int32_t n = 0;
n += tPutI64v(p ? p + n : p, pDataFile->commitID);
n += tPutI64v(p ? p + n : p, pDataFile->size);
return n;
}
static int32_t tGetDataFile(uint8_t *p, SDataFile *pDataFile) {
int32_t n = 0;
n += tGetI64v(p + n, &pDataFile->commitID);
n += tGetI64v(p + n, &pDataFile->size);
return n;
}
static int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile) {
int32_t n = 0;
n += tPutI64v(p ? p + n : p, pLastFile->commitID);
n += tPutI64v(p ? p + n : p, pLastFile->size);
return n;
}
static int32_t tGetLastFile(uint8_t *p, SLastFile *pLastFile) {
int32_t n = 0;
n += tGetI64v(p + n, &pLastFile->commitID);
n += tGetI64v(p + n, &pLastFile->size);
return n;
}
static int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
int32_t n = 0;
n += tPutI64v(p ? p + n : p, pSmaFile->commitID);
n += tPutI64v(p ? p + n : p, pSmaFile->size);
return n;
}
static int32_t tGetSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
int32_t n = 0;
n += tGetI64v(p + n, &pSmaFile->commitID);
n += tGetI64v(p + n, &pSmaFile->size);
return n;
}
// EXPOSED APIS ==================================================
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]) { void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]) {
STfs *pTfs = pTsdb->pVnode->pTfs; STfs *pTfs = pTsdb->pVnode->pTfs;
@ -49,27 +124,18 @@ int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype) {
int32_t n = 0; int32_t n = 0;
switch (ftype) { switch (ftype) {
case TSDB_HEAD_FILE: { case TSDB_HEAD_FILE:
SHeadFile *pHeadFile = &pSet->fHead; n += tPutHeadFile(p ? p + n : p, &pSet->fHead);
n += tPutI64(p + n, pHeadFile->commitID); break;
n += tPutI64(p + n, pHeadFile->size); case TSDB_DATA_FILE:
n += tPutI64(p + n, pHeadFile->offset); n += tPutDataFile(p ? p + n : p, &pSet->fData);
} break; break;
case TSDB_DATA_FILE: { case TSDB_LAST_FILE:
SDataFile *pDataFile = &pSet->fData; n += tPutLastFile(p ? p + n : p, &pSet->fLast);
n += tPutI64(p + n, pDataFile->commitID); break;
n += tPutI64(p + n, pDataFile->size); case TSDB_SMA_FILE:
} break; n += tPutSmaFile(p ? p + n : p, &pSet->fSma);
case TSDB_LAST_FILE: { break;
SLastFile *pLastFile = &pSet->fLast;
n += tPutI64(p + n, pLastFile->commitID);
n += tPutI64(p + n, pLastFile->size);
} break;
case TSDB_SMA_FILE: {
SSmaFile *pSmaFile = &pSet->fSma;
n += tPutI64(p + n, pSmaFile->commitID);
n += tPutI64(p + n, pSmaFile->size);
} break;
default: default:
ASSERT(0); ASSERT(0);
} }
@ -77,13 +143,33 @@ int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype) {
return n; return n;
} }
// SHeadFile =============================================== int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) {
int32_t n = 0;
// SDataFile =============================================== n += tPutI32v(p ? p + n : p, pSet->diskId.level);
n += tPutI32v(p ? p + n : p, pSet->diskId.id);
n += tPutI32v(p ? p + n : p, pSet->fid);
n += tPutHeadFile(p ? p + n : p, &pSet->fHead);
n += tPutDataFile(p ? p + n : p, &pSet->fData);
n += tPutLastFile(p ? p + n : p, &pSet->fLast);
n += tPutSmaFile(p ? p + n : p, &pSet->fSma);
// SLastFile =============================================== return n;
}
// SSmaFile =============================================== int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
int32_t n = 0;
n += tGetI32v(p + n, &pSet->diskId.level);
n += tGetI32v(p + n, &pSet->diskId.id);
n += tGetI32v(p + n, &pSet->fid);
n += tGetHeadFile(p + n, &pSet->fHead);
n += tGetDataFile(p + n, &pSet->fData);
n += tGetLastFile(p + n, &pSet->fLast);
n += tGetSmaFile(p + n, &pSet->fSma);
return n;
}
// SDelFile =============================================== // SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) { void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
@ -92,3 +178,29 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, pTsdb->path, snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, pTsdb->path,
TD_DIRSEP, TD_VID(pTsdb->pVnode), pFile->commitID, ".del"); TD_DIRSEP, TD_VID(pTsdb->pVnode), pFile->commitID, ".del");
} }
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tPutI64(p ? p + n : p, pDelFile->minKey);
n += tPutI64(p ? p + n : p, pDelFile->maxKey);
n += tPutI64v(p ? p + n : p, pDelFile->minVersion);
n += tPutI64v(p ? p + n : p, pDelFile->maxVersion);
n += tPutI64v(p ? p + n : p, pDelFile->size);
n += tPutI64v(p ? p + n : p, pDelFile->offset);
return n;
}
int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tGetI64(p + n, &pDelFile->minKey);
n += tGetI64(p + n, &pDelFile->maxKey);
n += tGetI64v(p + n, &pDelFile->minVersion);
n += tGetI64v(p + n, &pDelFile->maxVersion);
n += tGetI64v(p + n, &pDelFile->size);
n += tGetI64v(p + n, &pDelFile->offset);
return n;
}

View File

@ -187,7 +187,7 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) {
// build // build
memset(*ppBuf, 0, size); memset(*ppBuf, 0, size);
n = tPutDelFileHdr(*ppBuf, pWriter->pFile); n = tPutDelFile(*ppBuf, pWriter->pFile);
taosCalcChecksumAppend(0, *ppBuf, size); taosCalcChecksumAppend(0, *ppBuf, size);
ASSERT(n <= size - sizeof(TSCKSUM)); ASSERT(n <= size - sizeof(TSCKSUM));

View File

@ -518,32 +518,6 @@ int32_t tGetDelData(uint8_t *p, void *ph) {
return n; return n;
} }
int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tPutI64(p ? p + n : p, pDelFile->minKey);
n += tPutI64(p ? p + n : p, pDelFile->maxKey);
n += tPutI64v(p ? p + n : p, pDelFile->minVersion);
n += tPutI64v(p ? p + n : p, pDelFile->maxVersion);
n += tPutI64v(p ? p + n : p, pDelFile->size);
n += tPutI64v(p ? p + n : p, pDelFile->offset);
return n;
}
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tGetI64(p + n, &pDelFile->minKey);
n += tGetI64(p + n, &pDelFile->maxKey);
n += tGetI64v(p + n, &pDelFile->minVersion);
n += tGetI64v(p + n, &pDelFile->maxVersion);
n += tGetI64v(p + n, &pDelFile->size);
n += tGetI64v(p + n, &pDelFile->offset);
return n;
}
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) { int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) {
if (key < 0) { if (key < 0) {
return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1); return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1);