more refact
This commit is contained in:
parent
5a8591052d
commit
c59b52855b
|
@ -263,12 +263,12 @@ struct TSDBKEY {
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
uint64_t suid;
|
||||||
|
uint64_t uid;
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
uint32_t offset;
|
uint32_t offset;
|
||||||
uint32_t hasLast : 2;
|
uint32_t hasLast : 2;
|
||||||
uint32_t numOfBlocks : 30;
|
uint32_t numOfBlocks : 30;
|
||||||
uint64_t suid;
|
|
||||||
uint64_t uid;
|
|
||||||
TSDBKEY maxKey;
|
TSDBKEY maxKey;
|
||||||
} SBlockIdx;
|
} SBlockIdx;
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,9 @@ typedef struct {
|
||||||
int32_t minutes;
|
int32_t minutes;
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
TSKEY nCommitKey;
|
TSKEY nCommitKey;
|
||||||
|
int32_t fid;
|
||||||
|
TSKEY minKey;
|
||||||
|
TSKEY maxKey;
|
||||||
SReadH readh;
|
SReadH readh;
|
||||||
SDFileSet wSet;
|
SDFileSet wSet;
|
||||||
SArray *aBlkIdx;
|
SArray *aBlkIdx;
|
||||||
|
@ -180,11 +183,11 @@ static int32_t tsdbCommitTable(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx
|
||||||
|
|
||||||
// commit table impl
|
// commit table impl
|
||||||
if (pMemData && pBlockIdx) {
|
if (pMemData && pBlockIdx) {
|
||||||
// merge
|
// TODO
|
||||||
} else if (pMemData) {
|
} else if (pMemData) {
|
||||||
// new one
|
// TODO
|
||||||
} else {
|
} else {
|
||||||
// save old ones
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
// commit table end
|
// commit table end
|
||||||
|
@ -224,9 +227,13 @@ static int32_t tsdbWriteBlockIdx(SDFile *pFile, SArray *pArray, uint8_t **ppBuf)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitFileStart(SCommitH *pCHandle, int32_t fid) {
|
static int32_t tsdbCommitFileStart(SCommitH *pCHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
STsdb *pTsdb = pCHandle->pMemTable->pTsdb;
|
||||||
|
SDFileSet *pSet = NULL;
|
||||||
|
|
||||||
|
taosArrayClear(pCHandle->aBlkIdx);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,61 +243,50 @@ static int32_t tsdbCommitFileEnd(SCommitH *pCHandle) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitFile(SCommitH *pCHandle, int32_t fid) {
|
static int32_t tsdbCommitFile(SCommitH *pCHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMemDataIter iter = {0};
|
SMemData *pMemData;
|
||||||
TSDBROW *pRow = NULL;
|
SBlockIdx *pBlockIdx;
|
||||||
int8_t hasData = 0;
|
|
||||||
TSKEY fidSKey;
|
|
||||||
TSKEY fidEKey;
|
|
||||||
int32_t iMemData;
|
int32_t iMemData;
|
||||||
int32_t nMemData;
|
int32_t nMemData;
|
||||||
int32_t iBlockIdx;
|
int32_t iBlockIdx;
|
||||||
int32_t nBlockIdx;
|
int32_t nBlockIdx;
|
||||||
|
|
||||||
pCHandle->nCommitKey = TSKEY_MAX;
|
// commit file start
|
||||||
|
code = tsdbCommitFileStart(pCHandle);
|
||||||
// create or open the file to commit(todo)
|
|
||||||
code = tsdbCommitFileStart(pCHandle, fid);
|
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// loop to commit each table data
|
// commit file impl
|
||||||
iMemData = 0;
|
iMemData = 0;
|
||||||
nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData);
|
nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData);
|
||||||
iBlockIdx = 0;
|
iBlockIdx = 0;
|
||||||
nBlockIdx = 0;
|
nBlockIdx = 0; // todo
|
||||||
for (;;) {
|
|
||||||
if (iBlockIdx >= nBlockIdx && iMemData >= nMemData) {
|
|
||||||
// code = tsdbWriteBlockIdx();
|
|
||||||
// if (code) {
|
|
||||||
// goto _err;
|
|
||||||
// }
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMemData *pMemData = NULL;
|
for (;;) {
|
||||||
SBlockIdx *pBlockIdx = NULL;
|
if (iMemData >= nMemData && iBlockIdx >= nBlockIdx) break;
|
||||||
|
|
||||||
|
pMemData = NULL;
|
||||||
|
pBlockIdx = NULL;
|
||||||
if (iMemData < nMemData) {
|
if (iMemData < nMemData) {
|
||||||
pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iBlockIdx);
|
pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData);
|
||||||
}
|
}
|
||||||
if (iBlockIdx < nBlockIdx) {
|
if (iBlockIdx < nBlockIdx) {
|
||||||
// pBlockIdx
|
// pBlockIdx = ;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMemData && pBlockIdx) {
|
if (pMemData && pBlockIdx) {
|
||||||
int32_t c = tsdbTableIdCmprFn(&(TABLEID){.suid = pMemData->suid, .uid = pMemData->uid},
|
int32_t c = tsdbTableIdCmprFn(pMemData, pBlockIdx);
|
||||||
&(TABLEID){.suid = pBlockIdx->suid, .uid = pBlockIdx->uid});
|
if (c < 0) {
|
||||||
if (c == 0) {
|
|
||||||
iMemData++;
|
iMemData++;
|
||||||
iBlockIdx++;
|
|
||||||
} else if (c < 0) {
|
|
||||||
pBlockIdx = NULL;
|
pBlockIdx = NULL;
|
||||||
|
} else if (c == 0) {
|
||||||
iMemData++;
|
iMemData++;
|
||||||
} else {
|
|
||||||
pMemData = NULL;
|
|
||||||
iBlockIdx++;
|
iBlockIdx++;
|
||||||
|
} else {
|
||||||
|
iBlockIdx++;
|
||||||
|
pMemData = NULL;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pMemData) {
|
if (pMemData) {
|
||||||
|
@ -306,7 +302,7 @@ static int32_t tsdbCommitFile(SCommitH *pCHandle, int32_t fid) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// close file
|
// commit file end
|
||||||
code = tsdbCommitFileEnd(pCHandle);
|
code = tsdbCommitFileEnd(pCHandle);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -328,8 +324,9 @@ static int32_t tsdbCommitData(SCommitH *pCHandle) {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (pCHandle->nCommitKey == TSKEY_MAX) break;
|
if (pCHandle->nCommitKey == TSKEY_MAX) break;
|
||||||
|
|
||||||
fid = TSDB_KEY_FID(pCHandle->nCommitKey, pCHandle->minutes, pCHandle->precision);
|
pCHandle->fid = TSDB_KEY_FID(pCHandle->nCommitKey, pCHandle->minutes, pCHandle->precision);
|
||||||
code = tsdbCommitFile(pCHandle, fid);
|
tsdbGetFidKeyRange(pCHandle->minutes, pCHandle->precision, pCHandle->fid, &pCHandle->minKey, &pCHandle->maxKey);
|
||||||
|
code = tsdbCommitFile(pCHandle);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue