more code

This commit is contained in:
Hongze Cheng 2022-08-23 17:11:17 +08:00
parent cc109000e0
commit f43bae0df0
1 changed files with 64 additions and 64 deletions

View File

@ -327,53 +327,53 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { // static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) {
int32_t code = 0; // int32_t code = 0;
ASSERT(pCommitter->dReader.pReader); // ASSERT(pCommitter->dReader.pReader);
ASSERT(pCommitter->dReader.pRowInfo); // ASSERT(pCommitter->dReader.pRowInfo);
SBlockData *pBlockDatal = &pCommitter->dReader.bDatal; // SBlockData *pBlockDatal = &pCommitter->dReader.bDatal;
pCommitter->dReader.iRow++; // pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pBlockDatal->nRow) { // if (pCommitter->dReader.iRow < pBlockDatal->nRow) {
if (pBlockDatal->uid) { // if (pBlockDatal->uid) {
pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid; // pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid;
} else { // } else {
pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow]; // pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow];
} // }
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); // pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow);
} else { // } else {
pCommitter->dReader.iBlockL++; // pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { // if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
SBlockL *pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); // SBlockL *pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL);
int64_t suid = pBlockL->suid; // int64_t suid = pBlockL->suid;
int64_t uid = pBlockL->maxUid; // int64_t uid = pBlockL->maxUid;
code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid); // code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid);
if (code) goto _exit; // if (code) goto _exit;
code = tBlockDataInit(pBlockDatal, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema); // code = tBlockDataInit(pBlockDatal, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema);
if (code) goto _exit; // if (code) goto _exit;
code = tsdbReadLastBlock(pCommitter->dReader.pReader, pBlockL, pBlockDatal); // code = tsdbReadLastBlock(pCommitter->dReader.pReader, pBlockL, pBlockDatal);
if (code) goto _exit; // if (code) goto _exit;
pCommitter->dReader.iRow = 0; // pCommitter->dReader.iRow = 0;
pCommitter->dReader.pRowInfo->suid = pBlockDatal->suid; // pCommitter->dReader.pRowInfo->suid = pBlockDatal->suid;
if (pBlockDatal->uid) { // if (pBlockDatal->uid) {
pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid; // pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid;
} else { // } else {
pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[0]; // pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[0];
} // }
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); // pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow);
} else { // } else {
pCommitter->dReader.pRowInfo = NULL; // pCommitter->dReader.pRowInfo = NULL;
} // }
} // }
_exit: // _exit:
return code; // return code;
} // }
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
@ -780,6 +780,7 @@ _err:
static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
int32_t code = 0; int32_t code = 0;
#if 0
STbData *pTbData = pIter->pTbData; STbData *pTbData = pIter->pTbData;
int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN});
@ -938,54 +939,47 @@ _exit:
_err: _err:
tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
#endif
return code; return code;
} }
static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) { static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) {
int32_t code = 0; int32_t code = 0;
STbData *pTbData = pIter->pTbData;
int32_t iBlock = 0; int32_t iBlock = 0;
SBlock block; SBlock block;
SBlock *pBlock = &block; SBlock *pBlock = &block;
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) { if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else { } else {
pBlock = NULL; pBlock = NULL;
} }
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid);
if (code) goto _err;
tMapDataReset(&pCommitter->dWriter.mBlock);
code = tBlockDataInit(&pCommitter->dReader.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
if (code) goto _err;
code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
if (code) goto _err;
// .data merge
while (pBlock && pRow) { while (pBlock && pRow) {
int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}); SBlock tBlock = {.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)};
if (c < 0) { // disk int32_t c = tBlockCmprFn(pBlock, &tBlock);
if (c < 0) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
// next
iBlock++; iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) { if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else { } else {
pBlock = NULL; pBlock = NULL;
} }
} else if (c > 0) { // memory } else if (c > 0) {
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey); code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey);
if (code) goto _err; if (code) goto _err;
// next
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL; pRow = NULL;
} }
} else { // merge } else {
int32_t nOvlp = tsdbGetNumOfRowsLessThan(pIter, pBlock->maxKey); int32_t nOvlp = tsdbGetNumOfRowsLessThan(pIter, pBlock->maxKey);
ASSERT(nOvlp > 0); ASSERT(nOvlp > 0);
@ -1016,7 +1010,6 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
// next
iBlock++; iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) { if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
@ -1041,21 +1034,28 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
STbDataIter iter = {0}; STbDataIter iter = {0};
TSDBROW *pRow; TSDBROW *pRow;
tMapDataReset(&pCommitter->dWriter.mBlock);
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, &iter); tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, &iter);
pRow = tsdbTbDataIterGet(&iter); pRow = tsdbTbDataIterGet(&iter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL; pRow = NULL;
} }
if (pRow == NULL) { if (pRow == NULL) {
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) == 0) { if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) == 0) {
code = tMapDataCopy(&pCommitter->dReader.mBlock, &pCommitter->dWriter.mBlock); code = tMapDataCopy(&pCommitter->dReader.mBlock, &pCommitter->dWriter.mBlock);
if (code) goto _err; if (code) goto _err;
} }
goto _exit; goto _exit;
} }
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid);
if (code) goto _err;
code = tBlockDataInit(&pCommitter->dReader.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
if (code) goto _err;
code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
if (code) goto _err;
// commit data // commit data
code = tsdbMergeCommitData(pCommitter, &iter); code = tsdbMergeCommitData(pCommitter, &iter);
if (code) goto _err; if (code) goto _err;