more code
This commit is contained in:
parent
1836fb0800
commit
516306b598
|
@ -83,9 +83,9 @@ static int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2**
|
||||||
code = tBlockDataCreate(&pIter->dIter.bData);
|
code = tBlockDataCreate(&pIter->dIter.bData);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pIter->dIter.iBlockIdx = -1;
|
pIter->dIter.iBlockIdx = 0;
|
||||||
pIter->dIter.iDataBlk = -1;
|
pIter->dIter.iDataBlk = 0;
|
||||||
pIter->dIter.iRow = -1;
|
pIter->dIter.iRow = 0;
|
||||||
|
|
||||||
// read data
|
// read data
|
||||||
code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx);
|
code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx);
|
||||||
|
@ -130,8 +130,8 @@ static int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsd
|
||||||
code = tBlockDataCreate(&pIter->sIter.bData);
|
code = tBlockDataCreate(&pIter->sIter.bData);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pIter->sIter.iSttBlk = -1;
|
pIter->sIter.iSttBlk = 0;
|
||||||
pIter->sIter.iRow = -1;
|
pIter->sIter.iRow = 0;
|
||||||
|
|
||||||
// read data
|
// read data
|
||||||
code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk);
|
code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk);
|
||||||
|
@ -194,34 +194,37 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (++pIter->dIter.iRow < pIter->dIter.bData.nRow) {
|
if (pIter->dIter.iRow < pIter->dIter.bData.nRow) {
|
||||||
pIter->rowInfo.suid = pIter->dIter.bData.suid;
|
pIter->rowInfo.suid = pIter->dIter.bData.suid;
|
||||||
pIter->rowInfo.uid = pIter->dIter.bData.uid;
|
pIter->rowInfo.uid = pIter->dIter.bData.uid;
|
||||||
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
|
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
|
||||||
|
pIter->dIter.iRow++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (++pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
|
if (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
|
||||||
SDataBlk dataBlk;
|
SDataBlk dataBlk;
|
||||||
tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
||||||
|
|
||||||
code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData);
|
code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pIter->dIter.iRow = -1;
|
pIter->dIter.iDataBlk++;
|
||||||
|
pIter->dIter.iRow = 0;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (++pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
|
if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
|
||||||
SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
|
SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
|
||||||
|
|
||||||
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
|
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pIter->dIter.iDataBlk = -1;
|
pIter->dIter.iBlockIdx++;
|
||||||
|
pIter->dIter.iDataBlk = 0;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -244,20 +247,23 @@ static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (++pIter->sIter.iRow < pIter->sIter.bData.nRow) {
|
if (pIter->sIter.iRow < pIter->sIter.bData.nRow) {
|
||||||
pIter->rowInfo.suid = pIter->sIter.bData.suid;
|
pIter->rowInfo.suid = pIter->sIter.bData.suid;
|
||||||
pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
|
pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
|
||||||
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow);
|
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow);
|
||||||
|
pIter->sIter.iRow++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (++pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
|
if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
|
||||||
SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);
|
SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);
|
||||||
|
|
||||||
code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
|
code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pIter->sIter.iRow = -1;
|
pIter->sIter.iSttBlk++;
|
||||||
|
|
||||||
|
pIter->sIter.iRow = 0;
|
||||||
} else {
|
} else {
|
||||||
pIter->rowInfo = (SRowInfo){0};
|
pIter->rowInfo = (SRowInfo){0};
|
||||||
break;
|
break;
|
||||||
|
@ -1019,16 +1025,16 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
||||||
if (pWriter->pDIter) {
|
if (pWriter->pDIter) {
|
||||||
STsdbDataIter2* pIter = pWriter->pDIter;
|
STsdbDataIter2* pIter = pWriter->pDIter;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (pIter->dIter.iBlockIdx + 1 >= taosArrayGetSize(pIter->dIter.aBlockIdx)) {
|
if (pIter->dIter.iBlockIdx >= taosArrayGetSize(pIter->dIter.aBlockIdx)) {
|
||||||
pWriter->pDIter = NULL;
|
pWriter->pDIter = NULL;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx + 1);
|
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
|
||||||
|
|
||||||
int32_t c = tTABLEIDCmprFn(pBlockIdx, pId);
|
int32_t c = tTABLEIDCmprFn(pBlockIdx, pId);
|
||||||
if (c < 0) {
|
if (c < 0) {
|
||||||
++pIter->dIter.iBlockIdx;
|
pIter->dIter.iBlockIdx++;
|
||||||
|
|
||||||
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
|
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -1042,15 +1048,15 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
||||||
pNewBlockIdx->suid = pBlockIdx->suid;
|
pNewBlockIdx->suid = pBlockIdx->suid;
|
||||||
pNewBlockIdx->uid = pBlockIdx->uid;
|
pNewBlockIdx->uid = pBlockIdx->uid;
|
||||||
|
|
||||||
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pBlockIdx);
|
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pNewBlockIdx);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else if (c == 0) {
|
} else if (c == 0) {
|
||||||
++pIter->dIter.iBlockIdx;
|
pIter->dIter.iBlockIdx++;
|
||||||
|
|
||||||
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
|
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pIter->dIter.iDataBlk = -1;
|
pIter->dIter.iDataBlk = 0;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1089,58 +1095,38 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
#if 0
|
// TODO: do write the row, refer line 1593
|
||||||
if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code;
|
|
||||||
|
|
||||||
int32_t c = 1;
|
if (pWriter->bData.nRow > 0) {
|
||||||
if (pWriter->dReader.pBlockIdx) {
|
if (pWriter->bData.nRow < pWriter->minRow) {
|
||||||
c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id);
|
for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) {
|
||||||
ASSERT(c >= 0);
|
code = tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow),
|
||||||
}
|
pWriter->skmTable.pTSchema, pWriter->tbid.uid);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (c == 0) {
|
if (pWriter->sData.nRow >= pWriter->maxRow) {
|
||||||
SBlockData* pBData = &pWriter->dWriter.bData;
|
code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
|
}
|
||||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
|
|
||||||
|
|
||||||
code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
if (pBData->nRow >= pWriter->maxRow) {
|
|
||||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tBlockDataReset(&pWriter->bData);
|
||||||
|
} else {
|
||||||
|
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
|
|
||||||
SDataBlk dataBlk;
|
|
||||||
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
|
|
||||||
|
|
||||||
code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbSnapNextTableData(pWriter);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWriter->dWriter.mDataBlk.nItem) {
|
if (pWriter->mDataBlk.nItem) {
|
||||||
SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid};
|
SBlockIdx* pBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
|
||||||
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);
|
if (pBlockIdx == NULL) {
|
||||||
|
|
||||||
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
pWriter->id.suid = 0;
|
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx);
|
||||||
pWriter->id.uid = 0;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
#endif
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -1149,7 +1135,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteOpenDataFile(STsdbSnapWriter* pWriter, int32_t fid) {
|
static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
@ -1244,31 +1230,14 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteCloseDataFile(STsdbSnapWriter* pWriter) {
|
static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
ASSERT(pWriter->pDataFWriter);
|
ASSERT(pWriter->pDataFWriter);
|
||||||
|
|
||||||
#if 0
|
code = tsdbSnapWriteTableData(pWriter, NULL /* TODO */);
|
||||||
// loop write remain data
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
for (;;) {
|
|
||||||
SRowInfo* pRowInfo;
|
|
||||||
|
|
||||||
code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pRowInfo == NULL) break;
|
|
||||||
|
|
||||||
code = tsdbSnapWriteTableData(pWriter, pRowInfo);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbSnapWriteNextRow(pWriter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: write remain data
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// do file-level updates
|
// do file-level updates
|
||||||
code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk);
|
code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk);
|
||||||
|
@ -1303,139 +1272,6 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
SBlockData* pBData = &pWriter->bData;
|
|
||||||
TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]};
|
|
||||||
TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
|
|
||||||
TSDBKEY key = TSDBROW_KEY(&row);
|
|
||||||
|
|
||||||
*done = 0;
|
|
||||||
while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow ||
|
|
||||||
pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) {
|
|
||||||
// Merge row by row
|
|
||||||
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
|
|
||||||
TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
|
|
||||||
TSDBKEY tKey = TSDBROW_KEY(&trow);
|
|
||||||
|
|
||||||
ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid);
|
|
||||||
|
|
||||||
int32_t c = tsdbKeyCmprFn(&key, &tKey);
|
|
||||||
if (c < 0) {
|
|
||||||
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
} else if (c > 0) {
|
|
||||||
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
|
|
||||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
|
||||||
pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (c < 0) {
|
|
||||||
*done = 1;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge row by block
|
|
||||||
SDataBlk tDataBlk = {.minKey = key, .maxKey = key};
|
|
||||||
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
|
|
||||||
SDataBlk dataBlk;
|
|
||||||
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
|
|
||||||
|
|
||||||
int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk);
|
|
||||||
if (c < 0) {
|
|
||||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
|
||||||
pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
} else if (c > 0) {
|
|
||||||
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
|
|
||||||
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
|
|
||||||
pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
*done = 1;
|
|
||||||
goto _exit;
|
|
||||||
} else {
|
|
||||||
code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
pWriter->dReader.iRow = 0;
|
|
||||||
|
|
||||||
pWriter->dReader.iDataBlk++;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
TABLEID id = {.suid = pWriter->bData.suid,
|
|
||||||
.uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]};
|
|
||||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow);
|
|
||||||
SBlockData* pBData = &pWriter->dWriter.sData;
|
|
||||||
|
|
||||||
if (pBData->suid || pBData->uid) {
|
|
||||||
if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
|
|
||||||
code = tsdbWriteSttBlock(pWriter->pDataFWriter, pBData, pWriter->aSttBlk, pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pBData->suid = 0;
|
|
||||||
pBData->uid = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pBData->suid == 0 && pBData->uid == 0) {
|
|
||||||
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid};
|
|
||||||
code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tBlockDataAppendRow(pBData, &row, NULL, id.uid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pBData->nRow >= pWriter->maxRow) {
|
|
||||||
code = tsdbWriteSttBlock(pWriter->pDataFWriter, pBData, pWriter->aSttBlk, pWriter->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
|
static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1502,20 +1338,6 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteRowImpl(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
ASSERT(0);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1551,21 +1373,25 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
|
||||||
}
|
}
|
||||||
|
|
||||||
// do write the row
|
// do write the row
|
||||||
if (pWriter->pDIter == NULL /* || false */) {
|
if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
|
||||||
goto _write_incoming_row;
|
pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
|
||||||
|
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
while (pWriter->pDIter->dIter.iRow + 1 < pWriter->pDIter->dIter.bData.nRow) {
|
while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) {
|
||||||
TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow + 1);
|
TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow);
|
||||||
|
|
||||||
int32_t c = tsdbRowCmprFn(&pRowInfo->row, &row);
|
int32_t c = tsdbRowCmprFn(&pRowInfo->row, &row);
|
||||||
if (c < 0) {
|
if (c < 0) {
|
||||||
goto _write_incoming_row;
|
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
goto _exit;
|
||||||
} else if (c > 0) {
|
} else if (c > 0) {
|
||||||
++pWriter->pDIter->dIter.iRow;
|
|
||||||
|
|
||||||
code = tsdbSnapWriteTableRow(pWriter, &row);
|
code = tsdbSnapWriteTableRow(pWriter, &row);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
pWriter->pDIter->dIter.iRow++;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -1573,34 +1399,32 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
|
||||||
|
|
||||||
while (pWriter->pDIter->dIter.iDataBlk < pWriter->pDIter->dIter.mDataBlk.nItem) {
|
while (pWriter->pDIter->dIter.iDataBlk < pWriter->pDIter->dIter.mDataBlk.nItem) {
|
||||||
SDataBlk dataBlk;
|
SDataBlk dataBlk;
|
||||||
|
tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
||||||
tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk + 1, &dataBlk,
|
|
||||||
tGetDataBlk);
|
|
||||||
|
|
||||||
int32_t c = tDataBlkCmprFn(
|
int32_t c = tDataBlkCmprFn(
|
||||||
&dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)});
|
&dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)});
|
||||||
if (c > 0) {
|
if (c > 0) {
|
||||||
goto _write_incoming_row;
|
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
goto _exit;
|
||||||
} else if (c < 0) {
|
} else if (c < 0) {
|
||||||
++pWriter->pDIter->dIter.iDataBlk;
|
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
|
||||||
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
|
||||||
++pWriter->pDIter->dIter.iDataBlk;
|
|
||||||
|
|
||||||
|
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
|
||||||
|
pWriter->pDIter->dIter.iDataBlk++;
|
||||||
|
} else {
|
||||||
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
|
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pWriter->pDIter->dIter.iRow = -1;
|
pWriter->pDIter->dIter.iDataBlk++;
|
||||||
|
pWriter->pDIter->dIter.iRow = 0;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_write_incoming_row:
|
|
||||||
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
|
@ -1621,11 +1445,11 @@ static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHd
|
||||||
int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision);
|
int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision);
|
||||||
if (pWriter->fid != fid) {
|
if (pWriter->fid != fid) {
|
||||||
if (pWriter->pDataFWriter) {
|
if (pWriter->pDataFWriter) {
|
||||||
code = tsdbSnapWriteCloseDataFile(pWriter);
|
code = tsdbSnapWriteFileDataEnd(pWriter);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbSnapWriteOpenDataFile(pWriter, fid);
|
code = tsdbSnapWriteFileDataStart(pWriter, fid);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1927,7 +1751,7 @@ _exit:
|
||||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
|
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (pWriter->pDataFWriter) {
|
if (pWriter->pDataFWriter) {
|
||||||
code = tsdbSnapWriteCloseDataFile(pWriter);
|
code = tsdbSnapWriteFileDataEnd(pWriter);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2012,7 +1836,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else if (pWriter->pDataFWriter) {
|
} else if (pWriter->pDataFWriter) {
|
||||||
code = tsdbSnapWriteCloseDataFile(pWriter);
|
code = tsdbSnapWriteFileDataEnd(pWriter);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue