more work

This commit is contained in:
Hongze Cheng 2022-06-28 02:35:37 +00:00
parent aac1fb643f
commit d862aa0f3b
1 changed files with 32 additions and 9 deletions

View File

@ -636,7 +636,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
// loop to merge // loop to merge
pRow1 = tsdbTbDataIterGet(pIter); pRow1 = tsdbTbDataIterGet(pIter);
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0); *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0); ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err; if (code) goto _err;
@ -668,7 +668,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
pRow1 = NULL; pRow1 = NULL;
} }
} }
} else if (tsdbRowCmprFn(pRow1, pRow2) < 0) { } else if (tsdbRowCmprFn(pRow1, pRow2) > 0) {
*pRow = *pRow2; *pRow = *pRow2;
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
@ -702,7 +702,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
} }
} }
code = tBlockDataAppendRow(pBlockData, &row, pCommitter->pTSchema); code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
if (code) goto _err; if (code) goto _err;
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
@ -754,8 +754,9 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
tBlockReset(pBlock); tBlockReset(pBlock);
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
while (true) { while (true) {
if (pRow == NULL || tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) { if (pRow == NULL) {
if (pBlockData->nRow > 0) { if (pBlockData->nRow > 0) {
goto _write_block; goto _write_block;
} else { } else {
@ -783,6 +784,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
continue; continue;
@ -983,7 +985,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
int32_t code = 0; int32_t code = 0;
STbDataIter *pIter = &(STbDataIter){0}; STbDataIter *pIter = &(STbDataIter){0};
TSDBROW *pRow; TSDBROW *pRow;
int32_t iBlock = 0; int32_t iBlock;
int32_t nBlock; int32_t nBlock;
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
@ -991,6 +993,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pTbData) { if (pTbData) {
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
suid = pTbData->suid; suid = pTbData->suid;
uid = pTbData->uid; uid = pTbData->uid;
} else { } else {
@ -1004,18 +1008,20 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
nBlock = pCommitter->oBlockMap.nItem; nBlock = pCommitter->oBlockMap.nItem;
ASSERT(nBlock > 0); ASSERT(nBlock > 0);
suid = pBlockIdx->suid; suid = pBlockIdx->suid;
uid = pBlockIdx->uid; uid = pBlockIdx->uid;
} else { } else {
nBlock = 0; nBlock = 0;
} }
if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && nBlock == 0) goto _exit; if (pRow == NULL && nBlock == 0) goto _exit;
// start =========== // start ===========
tMapDataReset(&pCommitter->nBlockMap); tMapDataReset(&pCommitter->nBlockMap);
SBlock *pBlock = &pCommitter->oBlock; SBlock *pBlock = &pCommitter->oBlock;
iBlock = 0;
if (iBlock < nBlock) { if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else { } else {
@ -1024,24 +1030,28 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// merge =========== // merge ===========
while (true) { while (true) {
if (((pRow == NULL) || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break; if (pRow == NULL && pBlock == NULL) break;
if (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey && pBlock) { if (pRow && pBlock) {
if (pBlock->last) { if (pBlock->last) {
code = tsdbMergeTableData(pCommitter, pIter, pBlock, code = tsdbMergeTableData(pCommitter, pIter, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err; if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++; iBlock++;
if (iBlock < nBlock) { if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else { } else {
pBlock = NULL; pBlock = NULL;
} }
ASSERT(pRow == NULL && pBlock == NULL);
} else { } else {
int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock); int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
if (c > 0) { if (c > 0) {
// only disk data
code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx); code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
if (code) goto _err; if (code) goto _err;
@ -1052,11 +1062,14 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
pBlock = NULL; pBlock = NULL;
} }
} else if (c < 0) { } else if (c < 0) {
// only memory data
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
if (code) goto _err; if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
} else { } else {
// merge memory and disk
int64_t nOvlp = 0; // (todo) int64_t nOvlp = 0; // (todo)
if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) { if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
// add as a subblock // add as a subblock
@ -1071,6 +1084,15 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (code) goto _err; if (code) goto _err;
} }
} }
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
} }
} }
} else if (pBlock) { } else if (pBlock) {
@ -1089,7 +1111,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (code) goto _err; if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
ASSERT(pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey); if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
ASSERT(pRow == NULL);
} }
} }