more work

This commit is contained in:
Hongze Cheng 2022-06-15 11:13:55 +00:00
parent c268437c7f
commit 63802f06a9
1 changed files with 42 additions and 61 deletions

View File

@ -346,75 +346,56 @@ _err:
return code; return code;
} }
static FORCE_INLINE bool tsdbCommitIterEnd(SCommitter *pCommitter, STbDataIter *pIter) {
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
return ((pRow == NULL) || (pRow->pTSRow->ts <= pCommitter->maxKey));
}
static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx) {
int32_t code = 0;
TSDBROW *pRow;
int32_t iBlock;
int32_t nBlock;
SBlock *pBlock;
SBlock block;
// start(todo)
// impl
pRow = tsdbTbDataIterGet(pIter);
if (iBlock < nBlock) {
pBlock = &block;
tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
}
while (true) {
if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && pBlock == NULL) break;
}
// end
return code;
_err:
return code;
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
int32_t code = 0; int32_t code = 0;
STbDataIter *pIter = NULL; STbDataIter *pIter = NULL;
STbDataIter iter; STbDataIter iter;
TSDBROW *pRow = NULL; TSDBROW *pRow;
SBlockIdx blockIdx;
int32_t iBlock;
int32_t nBlock;
SBlock *pBlock;
SBlock block;
// check // create iter if can
if (pTbData) { if (pTbData) {
pIter = &iter; pIter = &iter;
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter); tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
pRow = tsdbTbDataIterGet(pIter);
if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && pBlockIdx == NULL) goto _exit;
} }
// start // check
tMapDataReset(&pCommitter->oBlock); if (tsdbCommitIterEnd(pCommitter, pIter) && pBlockIdx == NULL) goto _exit;
tMapDataReset(&pCommitter->nBlock);
if (pBlockIdx) {
code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlock, NULL);
if (code) goto _err;
}
if (pTbData) { // impl
blockIdx.suid = pTbData->suid; code = tsdbCommitTableDataImpl(pCommitter, pIter, pBlockIdx);
blockIdx.uid = pTbData->uid;
} else {
blockIdx.suid = pBlockIdx->suid;
blockIdx.uid = pBlockIdx->uid;
}
blockIdx.minKey.version = INT64_MAX;
blockIdx.minKey.ts = TSKEY_MAX;
blockIdx.maxKey.version = 0;
blockIdx.maxKey.ts = TSKEY_MIN;
blockIdx.minVersion = INT64_MAX;
blockIdx.maxVersion = INT64_MIN;
blockIdx.offset = -1;
blockIdx.size = -1;
// impl (todo)
iBlock = 0;
nBlock = pCommitter->oBlock.nItem;
do {
pRow = tsdbTbDataIterGet(pIter);
if (iBlock < nBlock) {
pBlock = &block;
code = tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
if (code) goto _err;
} else {
pBlock == NULL;
}
if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && pBlock == NULL) break;
code = tsdbMergeData(pCommitter, pIter, pBlock);
if (code) goto _err;
if (iBlock < nBlock) iBlock++;
} while (true);
// end
code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlock, NULL, &blockIdx);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockIdx, &blockIdx, tPutBlockIdx);
if (code) goto _err; if (code) goto _err;
_exit: _exit:
@ -485,7 +466,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
if (iBlockIdx < nBlockIdx) { if (iBlockIdx < nBlockIdx) {
pBlockIdx = &blockIdx; pBlockIdx = &blockIdx;
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
if (code) goto _err; if (code) goto _err;
} else { } else {
pBlockIdx = NULL; pBlockIdx = NULL;
@ -529,7 +510,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
iBlockIdx++; iBlockIdx++;
if (iBlockIdx < nBlockIdx) { if (iBlockIdx < nBlockIdx) {
pBlockIdx = &blockIdx; pBlockIdx = &blockIdx;
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
if (code) goto _err; if (code) goto _err;
} else { } else {
pBlockIdx = NULL; pBlockIdx = NULL;
@ -549,7 +530,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
} }
if (iBlockIdx < nBlockIdx) { if (iBlockIdx < nBlockIdx) {
pBlockIdx = &blockIdx; pBlockIdx = &blockIdx;
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
if (code) goto _err; if (code) goto _err;
} else { } else {
pBlockIdx = NULL; pBlockIdx = NULL;