more work

This commit is contained in:
Hongze Cheng 2022-06-27 15:42:55 +00:00
parent 83fa9e63b0
commit 44c473303f
3 changed files with 221 additions and 131 deletions

View File

@ -104,6 +104,8 @@ int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow);
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2))
#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2))
// KEYINFO
#define tKEYINFOInit() \
((KEYINFO){.maxKey = {.ts = TSKEY_MIN, .version = -1}, \
@ -139,6 +141,8 @@ int32_t tColDataPCmprFn(const void *p1, const void *p2);
// SBlockData
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
#define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA))
#define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA))
int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData);

View File

@ -614,31 +614,39 @@ _err:
return code;
}
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx, SBlock *pBlock,
TSDBKEY toKey /*not included*/, int8_t toDataOnly) {
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
int8_t toDataOnly) {
int32_t code = 0;
SBlockData *pBlockDataFrom = &pCommitter->oBlockData;
SBlockData *pBlockDataTo = &pCommitter->nBlockData;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlock *pBlock = &pCommitter->nBlock;
TSDBROW *pRow1;
TSDBROW *pRow2;
TSDBROW row2;
TSDBROW *pRow2 = &row2;
TSDBROW row;
TSDBROW *pRow = &row;
int32_t c = 0;
TSKEY lastKey;
// read SBlockData
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, pBlockDataFrom, NULL, NULL);
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
if (code) goto _err;
// loop to merge
tBlockDataReset(pBlockDataTo);
pRow1 = tsdbTbDataIterGet(pIter);
pRow2 = &tsdbRowFromBlockData(pBlockDataFrom, 0);
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err;
lastKey = TSKEY_MIN;
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
while (true) {
if (pRow1 == NULL && pRow2 == NULL) {
if (pBlockDataTo->nRow == 0) {
if (pBlockData->nRow == 0) {
break;
} else {
goto _write_block;
@ -646,43 +654,85 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
}
if (pRow1 && pRow2) {
c = tsdbRowCmprFn(pRow1, pRow2);
if (c < 0) {
code = tBlockDataAppendRow(pBlockDataTo, pRow1, pCommitter->pTSchema);
if (code) goto _err;
if (tsdbRowCmprFn(pRow1, pRow2) < 0) {
*pRow = *pRow1;
tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter);
// TODO
} else if (c > 0) {
code = tBlockDataAppendRow(pBlockDataTo, pRow2, NULL);
if (code) goto _err;
pRow2 = pRow2->iRow + 1 < pBlockDataFrom->nRow ? &tsdbRowFromBlockData(pBlockDataFrom, pRow2->iRow + 1) : NULL;
if (pRow1) {
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1));
if (code) goto _err;
} else {
pRow1 = NULL;
}
}
} else if (tsdbRowCmprFn(pRow1, pRow2) < 0) {
*pRow = *pRow2;
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
} else {
pRow2 = NULL;
}
} else {
ASSERT(0);
}
} else if (pRow1) {
code = tBlockDataAppendRow(pBlockDataTo, pRow1, pCommitter->pTSchema);
*pRow = *pRow1;
tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter);
// TODO
if (pRow1) {
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1));
if (code) goto _err;
} else {
pRow1 = NULL;
}
}
} else {
code = tBlockDataAppendRow(pBlockDataTo, pRow2, NULL);
if (code) goto _err;
*pRow = *pRow2;
pRow2 = pRow2->iRow + 1 < pBlockDataFrom->nRow ? &tsdbRowFromBlockData(pBlockDataFrom, pRow2->iRow + 1) : NULL;
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
} else {
pRow2 = NULL;
}
}
if (pBlockDataTo->nRow >= pCommitter->maxRow * 4 / 5) {
goto _write_block;
code = tBlockDataAppendRow(pBlockData, &row, pCommitter->pTSchema);
if (code) goto _err;
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow));
pBlock->nRow++;
if (lastKey == TSDBROW_TS(pRow)) {
pBlock->hasDup = 1;
} else {
continue;
lastKey = TSDBROW_TS(pRow);
}
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
continue;
_write_block:
tBlockDataReset(pBlockDataTo);
// TODO
if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
pBlock->last = 1;
} else {
pBlock->last = 0;
}
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err;
lastKey = TSKEY_MIN;
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
}
return code;
@ -695,10 +745,15 @@ _err:
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
int32_t code = 0;
TSDBROW *pRow;
SBlock *pBlock = &pCommitter->nBlock;
SBlockData *pBlockData = &pCommitter->nBlockData;
TSKEY lastKey = TSKEY_MIN;
int64_t suid = pIter->pTbData->suid;
int64_t uid = pIter->pTbData->uid;
pRow = tsdbTbDataIterGet(pIter);
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
pRow = tsdbTbDataIterGet(pIter);
while (true) {
if (pRow == NULL || tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) {
if (pBlockData->nRow > 0) {
@ -708,24 +763,53 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
}
}
// update schema
code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
// append
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
if (code) goto _err;
// update
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
pBlock->maxVersion = TMIN(pBlock->maxVersion, TSDBROW_VERSION(pRow));
pBlock->nRow++;
if (TSDBROW_TS(pRow) == lastKey) {
pBlock->hasDup = 1;
} else {
lastKey = TSDBROW_TS(pRow);
}
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
continue;
_write_block:
if (!toDataOnly && pBlockData->nRow < pCommitter->minKey) {
pBlock->last = 1;
} else {
pBlock->last = 0;
}
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, &(SBlockIdx){.suid = suid, .uid = uid},
pBlock, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
lastKey = TSKEY_MIN;
}
return code;
_err:
tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
@ -762,119 +846,120 @@ _err:
}
static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *oBlockIdx) {
int32_t code = 0;
STbDataIter *pIter = &(STbDataIter){0};
TSDBROW *pRow;
int32_t code = 0;
// STbDataIter *pIter = &(STbDataIter){0};
// TSDBROW *pRow;
// create iter
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow == tsdbTbDataIterGet(pIter);
if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) {
code = tsdbCommitDiskData(pCommitter, oBlockIdx);
if (code) {
goto _err;
} else {
goto _exit;
}
}
// // create iter
// tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
// pRow == tsdbTbDataIterGet(pIter);
// if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) {
// code = tsdbCommitDiskData(pCommitter, oBlockIdx);
// if (code) {
// goto _err;
// } else {
// goto _exit;
// }
// }
// start ==================
// read
code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL);
if (code) goto _err;
// // start ==================
// // read
// code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL);
// if (code) goto _err;
// loop to merge
// SBlockData *pBlockData = &pCommitter->nBlockData;
int32_t iBlock = 0;
int32_t nBlock = pCommitter->oBlockMap.nItem;
// SBlock *pBlockO = &pCommitter->oBlock;
SBlock *pBlock;
int32_t c;
// // loop to merge
// // SBlockData *pBlockData = &pCommitter->nBlockData;
// int32_t iBlock = 0;
// int32_t nBlock = pCommitter->oBlockMap.nItem;
// // SBlock *pBlockO = &pCommitter->oBlock;
// SBlock *pBlock;
// int32_t c;
// merge ===================
while (true) {
if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break;
// // merge ===================
// while (true) {
// if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break;
if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) {
if (pBlock->last) {
// merge memory data and disk data to write to .data/.last (todo)
code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
// if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) {
// if (pBlock->last) {
// // merge memory data and disk data to write to .data/.last (todo)
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
// (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
// if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
iBlock++;
} else {
c = tBlockCmprFn(&(SBlock){}, pBlock);
// pRow = tsdbTbDataIterGet(pIter);
// iBlock++;
// } else {
// c = tBlockCmprFn(&(SBlock){}, pBlock);
if (c < 0) {
// commit memory data until pBlock->minKey (not included) only to .data file (todo)
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
if (code) goto _err;
// if (c < 0) {
// // commit memory data until pBlock->minKey (not included) only to .data file (todo)
// code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
// if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
} else if (c > 0) {
// just move the block (todo)
// code = tsdbCommitTableDiskData(pCommitter, pBlock);
if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// } else if (c > 0) {
// // just move the block (todo)
// // code = tsdbCommitTableDiskData(pCommitter, pBlock);
// if (code) goto _err;
iBlock++;
// TODO
} else {
int64_t nOvlp = 0; // = tsdbOvlpRows();
if (nOvlp + pBlock->nRow <= pCommitter->maxRow) {
// add as a subblock
} else {
if (iBlock == nBlock - 1) {
// merge memory data and disk data to .data/.last file
code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
} else {
// merge memory data and disk data to .data file only until pBlock[1].
code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1);
}
}
// iBlock++;
// // TODO
// } else {
// int64_t nOvlp = 0; // = tsdbOvlpRows();
// if (nOvlp + pBlock->nRow <= pCommitter->maxRow) {
// // add as a subblock
// } else {
// if (iBlock == nBlock - 1) {
// // merge memory data and disk data to .data/.last file
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
// (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
// if (code) goto _err;
// } else {
// // merge memory data and disk data to .data file only until pBlock[1].
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1);
// }
// }
pRow = tsdbTbDataIterGet(pIter);
iBlock++;
}
}
} else if (pBlock) {
// code = tsdbCommitTableDiskData(pCommitter, pBlock);
if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// iBlock++;
// }
// }
// } else if (pBlock) {
// // code = tsdbCommitTableDiskData(pCommitter, pBlock);
// if (code) goto _err;
iBlock++;
// next block
} else {
// commit only memory data until (pCommitter->maxKey, VERSION_MAX)
code =
tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
// iBlock++;
// // next block
// } else {
// // commit only memory data until (pCommitter->maxKey, VERSION_MAX)
// code =
// tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version =
// VERSION_MIN}, 0);
// if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
}
}
// pRow = tsdbTbDataIterGet(pIter);
// }
// }
// end =====================
// SBlock
// code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
// if (code) goto _err;
// // end =====================
// // SBlock
// // code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
// // if (code) goto _err;
// // SBlockIdx
// code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
// if (code) goto _err;
// // // SBlockIdx
// // code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
// // if (code) goto _err;
_exit:
pRow = tsdbTbDataIterGet(pIter);
if (pRow) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
}
// _exit:
// pRow = tsdbTbDataIterGet(pIter);
// if (pRow) {
// pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
// }
return code;
_err:
tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
// _err:
// tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode),
// tstrerror(code)); return code;
}
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
@ -930,7 +1015,6 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// start ===========
tMapDataReset(&pCommitter->nBlockMap);
SBlock *pBlock = &pCommitter->oBlock;
int32_t c;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
@ -944,19 +1028,18 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey && pBlock) {
if (pBlock->last) {
code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock,
code = tsdbMergeTableData(pCommitter, pIter, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
iBlock++;
} else {
c = tBlockCmprFn(&(SBlock){}, pBlock);
int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
if (c > 0) {
code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
if (code) goto _err;
iBlock++;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
@ -974,12 +1057,12 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// add as a subblock
} else {
if (iBlock == nBlock - 1) {
code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock,
code = tsdbMergeTableData(pCommitter, pIter, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
} else {
// code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock, pBlock[1].minKey, 1);
// code = tsdbMergeTableData(pCommitter, pIter, pBlock, pBlock[1].minKey, 1);
if (code) goto _err;
}
}

View File

@ -1193,6 +1193,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, tBlockDataFirstKey(pBlockData));
pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, tBlockDataLastKey(pBlockData));
pSubBlock->nRow = pBlockData->nRow;
pSubBlock->cmprAlg = cmprAlg;
if (pBlock->last) {