more code
This commit is contained in:
parent
4d75c554f2
commit
a09b5b8b5e
|
@ -409,37 +409,75 @@ static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
TABLEID excludeTableId;
|
||||||
|
TABLEID *pExcludeTableId = NULL;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
code = tsdbCompactNextRowImpl(pCompactor, NULL);
|
code = tsdbCompactNextRowImpl(pCompactor, pExcludeTableId);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// check if the table of the row exists
|
// check if the table of the row exists
|
||||||
if (pCompactor->pIter) {
|
if (pCompactor->pIter) {
|
||||||
if (pCompactor->pIter->rowInfo.suid == pCompactor->tbSkm.suid &&
|
SRowInfo *pRowInfo = &pCompactor->pIter->rowInfo;
|
||||||
pCompactor->pIter->rowInfo.uid == pCompactor->tbSkm.uid) {
|
|
||||||
break;
|
// Table exists, just break out
|
||||||
} else {
|
if (pRowInfo->uid == pCompactor->tbSkm.uid) break;
|
||||||
SMetaInfo info;
|
|
||||||
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, &info, NULL) !=
|
SMetaInfo info;
|
||||||
TSDB_CODE_SUCCESS) {
|
if (pRowInfo->suid) { // child table
|
||||||
// table not exist
|
|
||||||
} else {
|
// check if super table exists
|
||||||
// update table schema
|
if (pRowInfo->suid != pCompactor->tbSkm.suid) {
|
||||||
STSchema *pTSchema =
|
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
|
||||||
metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, info.version, 1);
|
excludeTableId.suid = pRowInfo->suid;
|
||||||
if (pTSchema == NULL) {
|
excludeTableId.uid = 0;
|
||||||
|
pExcludeTableId = &excludeTableId;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// super table exists
|
||||||
|
pCompactor->tbSkm.suid = pRowInfo->suid;
|
||||||
|
pCompactor->tbSkm.uid = 0;
|
||||||
|
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
|
||||||
|
pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1);
|
||||||
|
if (pCompactor->tbSkm.pTSchema == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pCompactor->tbSkm.suid = pCompactor->pIter->rowInfo.suid;
|
// check if table exists
|
||||||
pCompactor->tbSkm.uid = pCompactor->pIter->rowInfo.uid;
|
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
|
||||||
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
|
excludeTableId.suid = pRowInfo->suid;
|
||||||
pCompactor->tbSkm.pTSchema = pTSchema;
|
excludeTableId.uid = pRowInfo->uid;
|
||||||
|
pExcludeTableId = &excludeTableId;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
// table exists
|
||||||
|
pCompactor->tbSkm.uid = pRowInfo->uid;
|
||||||
|
} else { // normal table
|
||||||
|
// check if table exists
|
||||||
|
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
|
||||||
|
excludeTableId.suid = pRowInfo->suid;
|
||||||
|
excludeTableId.uid = pRowInfo->uid;
|
||||||
|
pExcludeTableId = &excludeTableId;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// table exists
|
||||||
|
pCompactor->tbSkm.suid = pRowInfo->suid;
|
||||||
|
pCompactor->tbSkm.uid = pRowInfo->uid;
|
||||||
|
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
|
||||||
|
|
||||||
|
pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1);
|
||||||
|
if (pCompactor->tbSkm.pTSchema == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
} else {
|
} else {
|
||||||
// iter end, just break out
|
// iter end, just break out
|
||||||
break;
|
break;
|
||||||
|
@ -447,6 +485,10 @@ static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
||||||
|
tstrerror(code));
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,8 +502,10 @@ static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInf
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCompactor->pIter) {
|
if (pCompactor->pIter) {
|
||||||
|
ASSERT(pCompactor->pIter->rowInfo.suid == pCompactor->tbSkm.suid);
|
||||||
|
ASSERT(pCompactor->pIter->rowInfo.uid == pCompactor->tbSkm.uid);
|
||||||
*ppRowInfo = &pCompactor->pIter->rowInfo;
|
*ppRowInfo = &pCompactor->pIter->rowInfo;
|
||||||
*ppTSchema = NULL; // TODO
|
*ppTSchema = pCompactor->tbSkm.pTSchema;
|
||||||
} else {
|
} else {
|
||||||
*ppRowInfo = NULL;
|
*ppRowInfo = NULL;
|
||||||
*ppTSchema = NULL;
|
*ppTSchema = NULL;
|
||||||
|
@ -571,31 +615,31 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
|
||||||
nRow++;
|
nRow++;
|
||||||
|
|
||||||
// write block data if schema changed
|
// write block data if schema changed
|
||||||
if ((pCompactor->bData.suid || pCompactor->bData.uid) &&
|
// if ((pCompactor->bData.suid || pCompactor->bData.uid) &&
|
||||||
!TABLE_SAME_SCHEMA(pCompactor->bData.suid, pCompactor->bData.uid, pRowInfo->suid, pRowInfo->uid)) {
|
// !TABLE_SAME_SCHEMA(pCompactor->bData.suid, pCompactor->bData.uid, pRowInfo->suid, pRowInfo->uid)) {
|
||||||
// TODO: write block data
|
// // TODO: write block data
|
||||||
ASSERT(0);
|
// ASSERT(0);
|
||||||
|
|
||||||
// set block data not initialized
|
// // set block data not initialized
|
||||||
tBlockDataReset(&pCompactor->bData);
|
// tBlockDataReset(&pCompactor->bData);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// init the block data if not initialized yet
|
// // init the block data if not initialized yet
|
||||||
if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) {
|
// if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) {
|
||||||
code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
|
// code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
|
||||||
NULL, 0);
|
// NULL, 0);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// append row to block data
|
// // append row to block data
|
||||||
code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
|
// code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// check if block data is full
|
// // check if block data is full
|
||||||
if (pCompactor->bData.nRow >= pCompactor->maxRows) {
|
// if (pCompactor->bData.nRow >= pCompactor->maxRows) {
|
||||||
// TODO: write block data
|
// // TODO: write block data
|
||||||
ASSERT(0);
|
// ASSERT(0);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// iterate to next row
|
// iterate to next row
|
||||||
code = tsdbCompactNextRow(pCompactor);
|
code = tsdbCompactNextRow(pCompactor);
|
||||||
|
|
Loading…
Reference in New Issue