feat: impl tsdbSnapGetInfo

This commit is contained in:
Benguang Zhao 2023-09-12 19:29:22 +08:00
parent 517f1f7e40
commit c1b2eedd3c
4 changed files with 220 additions and 272 deletions

View File

@ -676,6 +676,11 @@ typedef TARRAY2(STFileSet *) TFileSetArray;
typedef struct STSnapRange STSnapRange; typedef struct STSnapRange STSnapRange;
typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges
// util
int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap);
struct STsdbReadSnap { struct STsdbReadSnap {
SMemTable *pMem; SMemTable *pMem;
SQueryNode *pNode; SQueryNode *pNode;

View File

@ -983,275 +983,6 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
return 0; return 0;
} }
static int32_t tsdbTSnapRangeCmprFn(STSnapRange *fsr1, STSnapRange *fsr2) {
if (fsr1->fid < fsr2->fid) return -1;
if (fsr1->fid > fsr2->fid) return 1;
if (fsr1->sver < fsr2->sver) return -1;
if (fsr1->sver > fsr2->sver) return 1;
if (fsr1->ever < fsr2->ever) return -1;
if (fsr1->ever < fsr2->ever) return 1;
return 0;
}
static int32_t tsdbTFileInsertSnapRange(STFile *f, TSnapRangeArray *snapR) {
STSnapRange *fsr = taosMemoryCalloc(1, sizeof(*fsr));
if (fsr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
fsr->fid = f->fid;
fsr->sver = f->minVer;
fsr->ever = f->maxVer;
int32_t code = TARRAY2_SORT_INSERT(snapR, fsr, tsdbTSnapRangeCmprFn);
if (code) {
taosMemoryFree(fsr);
fsr = NULL;
}
return code;
}
static int32_t tsdbTFSetInsertSnapRange(STFileSet *fset, TSnapRangeArray *snapR) {
STFile tf = {.fid = fset->fid, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset->farr[ftype] == NULL) continue;
STFile *f = fset->farr[ftype]->f;
tsdbTFileUpdVerRange(&tf, (SVersionRange){.minVer = f->minVer, .maxVer = f->maxVer});
}
int32_t code = tsdbTFileInsertSnapRange(&tf, snapR);
if (code) return code;
const SSttLvl *lvl;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
STFileObj *fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
// tsdbTFileUpdVerRange(&tf, (SVersionRange){.minVer = fobj->f->minVer, .maxVer = fobj->f->maxVer});
code = tsdbTFileInsertSnapRange(fobj->f, snapR);
if (code) return code;
}
}
// int32_t code = tsdbTFileInsertSnapRange(&tf, snapR);
// if (code) return code;
return code;
}
TSnapRangeArray *tsdbFSToSnapRangeArray(STFileSystem *fs) {
int32_t code = 0;
TSnapRangeArray *snapR = taosMemoryCalloc(1, sizeof(*snapR));
if (snapR == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
TARRAY2_INIT(snapR);
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) {
code = tsdbTFSetInsertSnapRange(fset, snapR);
if (code) break;
}
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
if (code) {
TARRAY2_DESTROY(snapR, tsdbTSnapRangeClear);
taosMemoryFree(snapR);
snapR = NULL;
}
return snapR;
}
static STSnapRange *taosDupSnapRange(STSnapRange *x) {
STSnapRange *y = taosMemoryCalloc(1, sizeof(STSnapRange));
if (y == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
ASSERTS(terrno == 0, "Out of memory");
return NULL;
}
y->fid = x->fid;
y->sver = x->sver;
y->ever = x->ever;
return y;
}
static TSnapRangeArray *taosDupSnapRangeArray(const TSnapRangeArray *X) {
TSnapRangeArray *Y = taosMemoryCalloc(1, sizeof(*Y));
if (Y == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
TARRAY2_INIT(Y);
if (X) {
STSnapRange *x;
TARRAY2_FOREACH(X, x) {
STSnapRange *tp = taosDupSnapRange(x);
TARRAY2_APPEND(Y, tp);
}
}
return Y;
}
static TSnapRangeArray *tsdbSnapDiff(const TSnapRangeArray *snapR, const TSnapRangeArray *pExclude) {
TSnapRangeArray *Z = NULL;
TSnapRangeArray *U = NULL;
TSnapRangeArray *V = NULL;
TSnapRangeArray *X = taosDupSnapRangeArray(snapR);
TSnapRangeArray *Y = taosDupSnapRangeArray(pExclude);
int32_t code = -1;
// separate intersections of snap ranges
U = taosMemoryCalloc(1, sizeof(TSnapRangeArray));
if (U == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}
TARRAY2_INIT(U);
V = taosMemoryCalloc(1, sizeof(TSnapRangeArray));
if (V == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}
TARRAY2_INIT(V);
int32_t i = 0;
int32_t j = 0;
while (i < TARRAY2_SIZE(X) && j < TARRAY2_SIZE(Y)) {
STSnapRange *x = TARRAY2_GET(X, i);
STSnapRange *y = TARRAY2_GET(Y, j);
if (x->fid < y->fid) {
STSnapRange *tmp = taosDupSnapRange(x);
TARRAY2_APPEND(U, tmp);
i++;
} else if (x->fid > y->fid) {
STSnapRange *tmp = taosDupSnapRange(y);
TARRAY2_APPEND(V, tmp);
j++;
} else {
if (x->sver < y->sver) {
if (x->ever < y->ever) {
STSnapRange *tmp = taosDupSnapRange(x);
TARRAY2_APPEND(U, tmp);
i++;
} else {
STSnapRange *tmp = taosDupSnapRange(x);
tmp->ever = y->sver - 1;
TARRAY2_APPEND(U, tmp);
x->sver = y->sver;
}
} else if (x->sver > y->sver) {
if (y->ever < x->ever) {
STSnapRange *tmp = taosDupSnapRange(y);
TARRAY2_APPEND(V, tmp);
j++;
} else {
STSnapRange *tmp = taosDupSnapRange(y);
tmp->ever = x->sver - 1;
TARRAY2_APPEND(V, tmp);
y->sver = x->sver;
}
} else {
if (x->ever < y->ever) {
STSnapRange *tmp = taosDupSnapRange(x);
TARRAY2_APPEND(U, tmp);
i++;
tmp = taosDupSnapRange(y);
tmp->ever = x->ever;
TARRAY2_APPEND(V, tmp);
y->sver = x->ever + 1;
} else if (x->ever > y->ever) {
STSnapRange *tmp = taosDupSnapRange(y);
TARRAY2_APPEND(V, tmp);
j++;
tmp = taosDupSnapRange(x);
tmp->ever = y->ever;
TARRAY2_APPEND(U, tmp);
x->sver = y->ever + 1;
} else {
STSnapRange *tmp = taosDupSnapRange(x);
TARRAY2_APPEND(U, tmp);
i++;
tmp = taosDupSnapRange(y);
TARRAY2_APPEND(V, tmp);
j++;
}
}
}
}
while (i < TARRAY2_SIZE(X)) {
STSnapRange *x = TARRAY2_GET(X, i);
STSnapRange *tmp = taosDupSnapRange(x);
TARRAY2_APPEND(U, tmp);
i++;
}
while (j < TARRAY2_SIZE(Y)) {
STSnapRange *y = TARRAY2_GET(Y, j);
STSnapRange *tmp = taosDupSnapRange(y);
TARRAY2_APPEND(V, tmp);
j++;
}
// difference of snap ranges
Z = taosMemoryCalloc(1, sizeof(*Z));
if (Z == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}
TARRAY2_INIT(Z);
i = 0;
j = 0;
while (i < TARRAY2_SIZE(U) && j < TARRAY2_SIZE(V)) {
STSnapRange *u = TARRAY2_GET(U, i);
STSnapRange *v = TARRAY2_GET(V, j);
if (u->fid < v->fid) {
STSnapRange *tmp = taosDupSnapRange(u);
TARRAY2_APPEND(Z, tmp);
i++;
} else if (u->fid == v->fid) {
if (u->sver < v->sver) {
STSnapRange *tmp = taosDupSnapRange(u);
TARRAY2_APPEND(Z, tmp);
i++;
} else if (u->sver > v->sver) {
ASSERT(u->ever > v->ever);
j++;
} else {
ASSERT(u->ever == v->ever);
i++;
j++;
}
}
}
while (i < TARRAY2_SIZE(U)) {
STSnapRange *u = TARRAY2_GET(U, i);
STSnapRange *tmp = taosDupSnapRange(u);
TARRAY2_APPEND(Z, tmp);
i++;
}
code = 0;
_out:
TSnapRangeArray **ppArrs[4] = {&X, &Y, &U, &V};
int len = sizeof(ppArrs) / sizeof(ppArrs[0]);
for (int k = 0; k < len; k++) {
if (ppArrs[k][0] == NULL) continue;
TARRAY2_DESTROY(ppArrs[k][0], tsdbTSnapRangeClear);
taosMemoryFree(ppArrs[k][0]);
ppArrs[k][0] = NULL;
}
if (code != 0 && Z) {
TARRAY2_DESTROY(Z, tsdbTSnapRangeClear);
taosMemoryFree(Z);
Z = NULL;
}
return Z;
}
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges, int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges,
TSnapRangeArray **fsrArr) { TSnapRangeArray **fsrArr) {
int32_t code = -1; int32_t code = -1;

View File

@ -1158,7 +1158,201 @@ _exit:
return code; return code;
} }
int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) { static int32_t tsdbTSnapRangeCmprFn(STSnapRange* fsr1, STSnapRange* fsr2) {
// TODO: get the full and diff info of tsdb Snap if (fsr1->fid < fsr2->fid) return -1;
if (fsr1->fid > fsr2->fid) return 1;
return 0; return 0;
} }
static int32_t tsdbTFileInsertSnapRange(STFile* f, TSnapRangeArray* snapR) {
STSnapRange* fsr = taosMemoryCalloc(1, sizeof(*fsr));
if (fsr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
fsr->fid = f->fid;
fsr->sver = f->minVer;
fsr->ever = f->maxVer;
int32_t code = TARRAY2_SORT_INSERT(snapR, fsr, tsdbTSnapRangeCmprFn);
if (code) {
taosMemoryFree(fsr);
fsr = NULL;
}
return code;
}
static int32_t tsdbTFSetInsertSnapRange(STFileSet* fset, TSnapRangeArray* snapR) {
STFile tf = {.fid = fset->fid, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset->farr[ftype] == NULL) continue;
STFile* f = fset->farr[ftype]->f;
tsdbTFileUpdVerRange(&tf, (SVersionRange){.minVer = f->minVer, .maxVer = f->maxVer});
}
const SSttLvl* lvl;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
STFileObj* fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
tsdbTFileUpdVerRange(&tf, (SVersionRange){.minVer = fobj->f->minVer, .maxVer = fobj->f->maxVer});
}
}
int32_t code = tsdbTFileInsertSnapRange(&tf, snapR);
if (code) return code;
return code;
}
static TSnapRangeArray* tsdbGetSnapRangeArray(STFileSystem* fs) {
int32_t code = 0;
TSnapRangeArray* snapR = taosMemoryCalloc(1, sizeof(*snapR));
if (snapR == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
TARRAY2_INIT(snapR);
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
STFileSet* fset;
TARRAY2_FOREACH(fs->fSetArr, fset) {
code = tsdbTFSetInsertSnapRange(fset, snapR);
if (code) break;
}
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
if (code) {
TARRAY2_DESTROY(snapR, tsdbTSnapRangeClear);
taosMemoryFree(snapR);
snapR = NULL;
}
return snapR;
}
int32_t tSerializeSnapRangeArray(void* buf, int32_t bufLen, TSnapRangeArray* pSnapR) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
int8_t msgVer = 1;
int32_t arrLen = TARRAY2_SIZE(pSnapR);
int8_t reserved8 = 0;
if (tStartEncode(&encoder) < 0) goto _err;
if (tEncodeI8(&encoder, msgVer) < 0) goto _err;
if (tEncodeI8(&encoder, reserved8) < 0) goto _err;
if (tEncodeI32(&encoder, arrLen) < 0) goto _err;
int64_t reserved64 = 0;
for (int32_t i = 0; i < arrLen; i++) {
STSnapRange* u = TARRAY2_GET(pSnapR, i);
int64_t fid = u->fid;
if (tEncodeI64(&encoder, fid) < 0) goto _err;
if (tEncodeI64(&encoder, u->sver) < 0) goto _err;
if (tEncodeI64(&encoder, u->ever) < 0) goto _err;
if (tEncodeI64(&encoder, reserved64) < 0) goto _err;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
_err:
tEncoderClear(&encoder);
return -1;
}
int32_t tDeserializeSnapRangeArray(void* buf, int32_t bufLen, TSnapRangeArray* pSnapR) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
int8_t msgVer = 0;
int32_t arrLen = 0;
int8_t reserved8 = 0;
if (tStartDecode(&decoder) < 0) goto _err;
if (tDecodeI8(&decoder, &msgVer) < 0) goto _err;
if (tDecodeI8(&decoder, &reserved8) < 0) goto _err;
if (tDecodeI32(&decoder, &arrLen) < 0) goto _err;
int64_t fid = 0;
int64_t reserved64 = 0;
STSnapRange* pRange = NULL;
for (int32_t i = 0; i < arrLen; i++) {
pRange = taosMemoryCalloc(1, sizeof(STSnapRange));
if (tDecodeI64(&decoder, &fid) < 0) goto _err;
pRange->fid = fid;
if (tDecodeI64(&decoder, &pRange->sver) < 0) goto _err;
if (tDecodeI64(&decoder, &pRange->ever) < 0) goto _err;
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err;
TARRAY2_APPEND(pSnapR, pRange);
pRange = NULL;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
_err:
if (pRange) {
taosMemoryFree(pRange);
pRange = NULL;
}
tDecoderClear(&decoder);
return -1;
}
void tsdbSnapRangeArrayDestroy(TSnapRangeArray** ppSnap) {
TARRAY2_DESTROY(ppSnap[0], tsdbTSnapRangeClear);
taosMemoryFree(ppSnap[0]);
ppSnap[0] = NULL;
}
static int32_t tsdbSnapInfoDataLenCalc(TSnapRangeArray* pSnap) {
int32_t headerLen = 8;
int32_t itemLen = sizeof(STSnapRange) + 8;
int32_t size = TARRAY2_SIZE(pSnap);
return headerLen + itemLen * size;
}
int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) {
int32_t code = 0;
if (pSnap->typ == TAOS_SYNC_SNAP_INFO_BRIEF) {
return 0;
}
code = -1;
TSnapRangeArray* snapR = tsdbGetSnapRangeArray(pTsdb->pFS);
if (snapR == NULL) {
goto _out;
}
if (pSnap->typ == TAOS_SYNC_SNAP_INFO_DIFF) {
for (int32_t i = 0; i < TARRAY2_SIZE(snapR); i++) {
STSnapRange* u = TARRAY2_GET(snapR, i);
u->sver = u->ever + 1;
u->ever = VERSION_MAX;
}
}
int32_t bufLen = sizeof(SMsgHead) + tsdbSnapInfoDataLenCalc(snapR);
void* data = taosMemoryRealloc(pSnap->data, bufLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}
pSnap->data = data;
void* buf = ((char*)data) + sizeof(SMsgHead);
int32_t tlen = 0;
if ((tlen = tSerializeSnapRangeArray(buf, bufLen, snapR)) < 0) {
tsdbError("vgId:%d, failed to serialize snap range since %s", TD_VID(pTsdb->pVnode), terrstr());
goto _out;
}
SMsgHead* msgHead = pSnap->data;
msgHead->contLen = tlen;
msgHead->vgId = TD_VID(pTsdb->pVnode);
code = 0;
_out:
if (snapR) {
tsdbSnapRangeArrayDestroy(&snapR);
}
return code;
}

View File

@ -63,7 +63,22 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
// TODO: decode pParam->data and store the result in pReader->pRanges if (pParam->data) {
pReader->pRanges = taosMemoryCalloc(1, sizeof(*pReader->pRanges));
if (pReader->pRanges == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
TARRAY2_INIT(pReader->pRanges);
SMsgHead *msgHead = pParam->data;
ASSERT(msgHead->vgId == TD_VID(pVnode));
void *buf = (char *)pParam->data + sizeof(SMsgHead);
if (tDeserializeSnapRangeArray(buf, msgHead->contLen, pReader->pRanges) < 0) {
vError("vgId:%d, failed to deserialize snap range.", TD_VID(pVnode));
goto _err;
}
}
vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever); vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
*ppReader = pReader; *ppReader = pReader;
@ -101,6 +116,9 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader); tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
} }
if (pReader->pRanges) {
tsdbSnapRangeArrayDestroy(&pReader->pRanges);
}
taosMemoryFree(pReader); taosMemoryFree(pReader);
} }