more vnode snapshot
This commit is contained in:
parent
338a763853
commit
b691e76782
|
@ -126,6 +126,8 @@ void tColDataClear(void *ph);
|
|||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
||||
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
|
||||
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
|
||||
int32_t tPutColData(uint8_t *p, SColData *pColData);
|
||||
int32_t tGetColData(uint8_t *p, SColData *pColData);
|
||||
// SBlockData
|
||||
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
|
||||
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
|
||||
|
@ -142,6 +144,8 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlo
|
|||
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
|
||||
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
|
||||
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
|
||||
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
|
||||
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
|
||||
// SDelIdx
|
||||
int32_t tPutDelIdx(uint8_t *p, void *ph);
|
||||
int32_t tGetDelIdx(uint8_t *p, void *ph);
|
||||
|
|
|
@ -29,7 +29,8 @@ struct STsdbSnapReader {
|
|||
SBlockIdx* pBlockIdx;
|
||||
SMapData mBlock; // SMapData<SBlock>
|
||||
int32_t iBlock;
|
||||
SBlockData blkData;
|
||||
SBlockData oBlockData;
|
||||
SBlockData nBlockData;
|
||||
// for del file
|
||||
int8_t delDone;
|
||||
SDelFReader* pDelFReader;
|
||||
|
@ -44,16 +45,11 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
|
||||
while (true) {
|
||||
if (pReader->pDataFReader == NULL) {
|
||||
SDFileSet* pSet = NULL;
|
||||
// taosArraySearch(pTsdb->fs->cState->aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSe)
|
||||
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->cState, pReader->fid, TD_GT);
|
||||
|
||||
// search the next data file set to read (todo)
|
||||
if (0 /* TODO */) {
|
||||
code = TSDB_CODE_VND_READ_END;
|
||||
goto _exit;
|
||||
}
|
||||
if (pSet == NULL) goto _exit;
|
||||
|
||||
// open
|
||||
pReader->fid = pSet->fid;
|
||||
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
|
||||
if (code) goto _err;
|
||||
|
||||
|
@ -63,6 +59,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
|
||||
pReader->iBlockIdx = 0;
|
||||
pReader->pBlockIdx = NULL;
|
||||
|
||||
tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read, fid:%d", TD_VID(pTsdb->pVnode), pReader->fid);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
|
@ -75,17 +73,15 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
|
||||
pReader->iBlockIdx++;
|
||||
|
||||
// SBlock
|
||||
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
pReader->iBlock = 0;
|
||||
}
|
||||
|
||||
SBlock block;
|
||||
SBlock* pBlock = █
|
||||
while (true) {
|
||||
SBlock block;
|
||||
SBlock* pBlock = █
|
||||
|
||||
if (pReader->iBlock >= pReader->mBlock.nItem) {
|
||||
pReader->pBlockIdx = NULL;
|
||||
break;
|
||||
|
@ -94,23 +90,63 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
|
||||
pReader->iBlock++;
|
||||
|
||||
if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) ||
|
||||
(pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) {
|
||||
// overlap (todo)
|
||||
if (pBlock->minVersion > pReader->ever || pBlock->maxVersion < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->blkData, NULL, NULL);
|
||||
code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->oBlockData, NULL, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
// filter
|
||||
tBlockDataReset(&pReader->nBlockData);
|
||||
for (int32_t iColData = 0; iColData < taosArrayGetSize(pReader->oBlockData.aIdx); iColData++) {
|
||||
SColData* pColDataO = tBlockDataGetColDataByIdx(&pReader->oBlockData, iColData);
|
||||
SColData* pColDataN = NULL;
|
||||
|
||||
code = tBlockDataAddColData(&pReader->nBlockData, taosArrayGetSize(pReader->nBlockData.aIdx), &pColDataN);
|
||||
if (code) goto _err;
|
||||
|
||||
goto _exit;
|
||||
tColDataInit(pColDataN, pColDataO->cid, pColDataO->type, pColDataO->smaOn);
|
||||
}
|
||||
|
||||
for (int32_t iRow = 0; iRow < pReader->oBlockData.nRow; iRow++) {
|
||||
TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow);
|
||||
int64_t version = TSDBROW_VERSION(&row);
|
||||
|
||||
if (version < pReader->sver || version > pReader->ever) continue;
|
||||
|
||||
code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// org data (todo)
|
||||
int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData);
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||
pHdr->type = 1;
|
||||
pHdr->size = size;
|
||||
|
||||
TABLEID* pId = (TABLEID*)(&pHdr[1]);
|
||||
pId->suid = pReader->pBlockIdx->suid;
|
||||
pId->uid = pReader->pBlockIdx->uid;
|
||||
|
||||
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
|
||||
|
||||
tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64
|
||||
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d",
|
||||
TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
|
||||
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow);
|
||||
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
// if (*ppData) {
|
||||
// tsdbInfo("vgId:%d ");
|
||||
// }
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
@ -221,7 +257,9 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
|
|||
goto _err;
|
||||
}
|
||||
pReader->mBlock = tMapDataInit();
|
||||
code = tBlockDataInit(&pReader->blkData);
|
||||
code = tBlockDataInit(&pReader->oBlockData);
|
||||
if (code) goto _err;
|
||||
code = tBlockDataInit(&pReader->nBlockData);
|
||||
if (code) goto _err;
|
||||
|
||||
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
|
||||
|
@ -254,7 +292,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
|
|||
}
|
||||
taosArrayDestroy(pReader->aBlockIdx);
|
||||
tMapDataClear(&pReader->mBlock);
|
||||
tBlockDataClear(&pReader->blkData);
|
||||
tBlockDataClear(&pReader->oBlockData);
|
||||
tBlockDataClear(&pReader->nBlockData);
|
||||
|
||||
if (pReader->pDelFReader) {
|
||||
tsdbDelFReaderClose(&pReader->pDelFReader);
|
||||
|
|
|
@ -921,6 +921,76 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tPutColData(uint8_t *p, SColData *pColData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI16v(p ? p + n : p, pColData->cid);
|
||||
n += tPutI8(p ? p + n : p, pColData->type);
|
||||
n += tPutI8(p ? p + n : p, pColData->smaOn);
|
||||
n += tPutI32v(p ? p + n : p, pColData->nVal);
|
||||
n += tPutU8(p ? p + n : p, pColData->flag);
|
||||
|
||||
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
|
||||
if (pColData->flag != HAS_VALUE) {
|
||||
// bitmap
|
||||
|
||||
int32_t size = BIT2_SIZE(pColData->nVal);
|
||||
if (p) {
|
||||
memcpy(p + n, pColData->pBitMap, size);
|
||||
}
|
||||
n += size;
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
// offset
|
||||
|
||||
int32_t size = sizeof(int32_t) * pColData->nVal;
|
||||
if (p) {
|
||||
memcpy(p + n, pColData->aOffset, size);
|
||||
}
|
||||
n += size;
|
||||
}
|
||||
n += tPutI32v(p ? p + n : p, pColData->nData);
|
||||
if (p) {
|
||||
memcpy(p + n, pColData->pData, pColData->nData);
|
||||
}
|
||||
n += pColData->nData;
|
||||
|
||||
_exit:
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tGetColData(uint8_t *p, SColData *pColData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetI16v(p + n, &pColData->cid);
|
||||
n += tGetI8(p + n, &pColData->type);
|
||||
n += tGetI8(p + n, &pColData->smaOn);
|
||||
n += tGetI32v(p + n, &pColData->nVal);
|
||||
n += tGetU8(p + n, &pColData->flag);
|
||||
|
||||
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
|
||||
if (pColData->flag != HAS_VALUE) {
|
||||
// bitmap
|
||||
|
||||
int32_t size = BIT2_SIZE(pColData->nVal);
|
||||
pColData->pBitMap = p + n;
|
||||
n += size;
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
// offset
|
||||
|
||||
int32_t size = sizeof(int32_t) * pColData->nVal;
|
||||
pColData->aOffset = (int32_t *)(p + n);
|
||||
n += size;
|
||||
}
|
||||
n += tGetI32v(p + n, &pColData->nData);
|
||||
pColData->pData = p + n;
|
||||
n += pColData->nData;
|
||||
|
||||
_exit:
|
||||
return n;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) {
|
||||
SColData *pColData1 = (SColData *)p1;
|
||||
SColData *pColData2 = (SColData *)p2;
|
||||
|
@ -1239,6 +1309,52 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
|
|||
*ppColData = NULL;
|
||||
}
|
||||
|
||||
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI32v(p ? p + n : p, pBlockData->nRow);
|
||||
if (p) {
|
||||
memcpy(p + n, pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow);
|
||||
}
|
||||
n = n + sizeof(int64_t) * pBlockData->nRow;
|
||||
if (p) {
|
||||
memcpy(p + n, pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow);
|
||||
}
|
||||
n = n + sizeof(TSKEY) * pBlockData->nRow;
|
||||
|
||||
int32_t nCol = taosArrayGetSize(pBlockData->aIdx);
|
||||
n += tPutI32v(p ? p + n : p, nCol);
|
||||
for (int32_t iCol = 0; iCol < nCol; iCol++) {
|
||||
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
|
||||
n += tPutColData(p ? p + n : p, pColData);
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) {
|
||||
int32_t n = 0;
|
||||
|
||||
tBlockDataReset(pBlockData);
|
||||
|
||||
n += tGetI32v(p + n, &pBlockData->nRow);
|
||||
pBlockData->aVersion = (int64_t *)(p + n);
|
||||
n = n + sizeof(int64_t) * pBlockData->nRow;
|
||||
pBlockData->aTSKEY = (TSKEY *)(p + n);
|
||||
n = n + sizeof(TSKEY) * pBlockData->nRow;
|
||||
|
||||
int32_t nCol;
|
||||
n += tGetI32v(p + n, &nCol);
|
||||
for (int32_t iCol = 0; iCol < nCol; iCol++) {
|
||||
SColData *pColData;
|
||||
|
||||
if (tBlockDataAddColData(pBlockData, iCol, &pColData)) return -1;
|
||||
n += tGetColData(p + n, pColData);
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
// ALGORITHM ==============================
|
||||
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
|
||||
SColVal colVal;
|
||||
|
|
Loading…
Reference in New Issue