tsdb/fsreader: clear last fileset's resources
This commit is contained in:
parent
a127e91534
commit
185081a948
|
@ -1932,6 +1932,10 @@ static void clearLastFileSet(SFSNextRowIter *state) {
|
||||||
tBlockDataDestroy(state->pBlockData);
|
tBlockDataDestroy(state->pBlockData);
|
||||||
state->pBlockData = NULL;
|
state->pBlockData = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (state->pFileReader) {
|
||||||
|
tsdbDataFileReaderClose(&state->pFileReader);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
|
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
|
||||||
|
@ -2008,7 +2012,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = TARRAY2_SIZE(pBlkArray); i >= 0; --i) {
|
for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
|
||||||
SBrinBlk *pBrinBlk = &pBlkArray->data[i];
|
SBrinBlk *pBrinBlk = &pBlkArray->data[i];
|
||||||
if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
|
if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
|
||||||
if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
|
if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
|
||||||
|
@ -2023,11 +2027,15 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
int indexSize = TARRAY_SIZE(state->pIndexList);
|
int indexSize = TARRAY_SIZE(state->pIndexList);
|
||||||
if (indexSize <= 0) {
|
if (indexSize <= 0) {
|
||||||
// goto next fileset
|
// goto next fileset
|
||||||
|
clearLastFileSet(state);
|
||||||
|
goto _next_fileset;
|
||||||
}
|
}
|
||||||
|
|
||||||
state->state = SFSNEXTROW_INDEXLIST;
|
state->state = SFSNEXTROW_INDEXLIST;
|
||||||
state->iBrinIndex = indexSize;
|
state->iBrinIndex = indexSize;
|
||||||
} else { // empty fileset, goto next fileset
|
} else { // empty fileset, goto next fileset
|
||||||
|
// clearLastFileSet(state);
|
||||||
|
goto _next_fileset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2035,6 +2043,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
SBrinBlk *pBrinBlk = NULL;
|
SBrinBlk *pBrinBlk = NULL;
|
||||||
_next_brinindex:
|
_next_brinindex:
|
||||||
if (--state->iBrinIndex < 0) { // no index left, goto next fileset
|
if (--state->iBrinIndex < 0) { // no index left, goto next fileset
|
||||||
|
clearLastFileSet(state);
|
||||||
|
goto _next_fileset;
|
||||||
} else {
|
} else {
|
||||||
pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
|
pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
|
||||||
}
|
}
|
||||||
|
@ -2163,28 +2173,9 @@ int32_t clearNextRowFromFS(void *iter) {
|
||||||
if (!state) {
|
if (!state) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
if (state->pDataFReader) {
|
|
||||||
tsdbDataFReaderClose(&state->pDataFReader);
|
|
||||||
state->pDataFReader = NULL;
|
|
||||||
}
|
|
||||||
if (state->aBlockIdx) {
|
|
||||||
// taosArrayDestroy(state->aBlockIdx);
|
|
||||||
tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);
|
|
||||||
|
|
||||||
state->aBlockIdxHandle = NULL;
|
clearLastFileSet(state);
|
||||||
state->aBlockIdx = NULL;
|
|
||||||
}
|
|
||||||
if (state->pBlockData) {
|
|
||||||
// tBlockDataDestroy(&state->blockData, 1);
|
|
||||||
tBlockDataDestroy(state->pBlockData);
|
|
||||||
state->pBlockData = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (state->blockMap.pData != NULL) {
|
|
||||||
tMapDataClear(&state->blockMap);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2638,44 +2629,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
pIter->pTsdb = pTsdb;
|
pIter->pTsdb = pTsdb;
|
||||||
|
|
||||||
pIter->pMemDelData = NULL;
|
pIter->pMemDelData = NULL;
|
||||||
|
|
||||||
loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer);
|
loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer);
|
||||||
#if 0
|
|
||||||
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
|
||||||
|
|
||||||
SDelFile *pDelFile = pReadSnap->fs.pDelFile;
|
|
||||||
if (pDelFile) {
|
|
||||||
SDelFReader *pDelFReader;
|
|
||||||
|
|
||||||
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
SArray *pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));
|
|
||||||
|
|
||||||
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
|
|
||||||
if (code) {
|
|
||||||
taosArrayDestroy(pDelIdxArray);
|
|
||||||
tsdbDelFReaderClose(&pDelFReader);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDelIdx *delIdx = taosArraySearch(pDelIdxArray, &(SDelIdx){.suid = suid, .uid = uid}, tCmprDelIdx, TD_EQ);
|
|
||||||
|
|
||||||
code = getTableDelSkyline(pMem, pIMem, pDelFReader, delIdx, pIter->pSkyline);
|
|
||||||
if (code) {
|
|
||||||
taosArrayDestroy(pDelIdxArray);
|
|
||||||
tsdbDelFReaderClose(&pDelFReader);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pDelIdxArray);
|
|
||||||
tsdbDelFReaderClose(&pDelFReader);
|
|
||||||
} else {
|
|
||||||
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
|
|
||||||
#endif
|
|
||||||
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
|
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
|
||||||
|
|
||||||
pIter->fsState.pRowIter = pIter;
|
pIter->fsState.pRowIter = pIter;
|
||||||
|
@ -2687,13 +2643,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
pIter->fsState.suid = suid;
|
pIter->fsState.suid = suid;
|
||||||
pIter->fsState.uid = uid;
|
pIter->fsState.uid = uid;
|
||||||
pIter->fsState.lastTs = lastTs;
|
pIter->fsState.lastTs = lastTs;
|
||||||
|
pIter->fsState.pr = pr;
|
||||||
|
|
||||||
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
|
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
|
||||||
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
|
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
|
||||||
/*
|
|
||||||
pIter->input[2] = (TsdbNextRowState){
|
|
||||||
&pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast};
|
|
||||||
*/
|
|
||||||
pIter->input[2] =
|
pIter->input[2] =
|
||||||
(TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
|
(TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
|
||||||
|
|
||||||
|
@ -2721,7 +2674,7 @@ _err:
|
||||||
static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
for (int i = 0; i < 4; ++i) {
|
for (int i = 0; i < 3; ++i) {
|
||||||
if (pIter->input[i].nextRowClearFn) {
|
if (pIter->input[i].nextRowClearFn) {
|
||||||
pIter->input[i].nextRowClearFn(pIter->input[i].iter);
|
pIter->input[i].nextRowClearFn(pIter->input[i].iter);
|
||||||
}
|
}
|
||||||
|
@ -2770,7 +2723,7 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
|
||||||
int nMax = 0;
|
int nMax = 0;
|
||||||
TSKEY maxKey = TSKEY_MIN;
|
TSKEY maxKey = TSKEY_MIN;
|
||||||
|
|
||||||
for (int i = 0; i < 4; ++i) {
|
for (int i = 0; i < 3; ++i) {
|
||||||
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
|
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
|
||||||
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
|
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
|
||||||
|
|
||||||
|
@ -2802,9 +2755,11 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
|
||||||
uint64_t uid = pIter->idx.uid;
|
uint64_t uid = pIter->idx.uid;
|
||||||
STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pIter->pr->pTableMap, &uid, sizeof(uid));
|
STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pIter->pr->pTableMap, &uid, sizeof(uid));
|
||||||
SArray *pTombData = pInfo->pTombData;
|
SArray *pTombData = pInfo->pTombData;
|
||||||
taosArrayAddAll(pTombData, pIter->pMemDelData);
|
if (pTombData) {
|
||||||
|
taosArrayAddAll(pTombData, pIter->pMemDelData);
|
||||||
|
|
||||||
code = tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(TARRAY_SIZE(pTombData) - 1), pIter->pSkyline);
|
code = tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(TARRAY_SIZE(pTombData) - 1), pIter->pSkyline);
|
||||||
|
}
|
||||||
|
|
||||||
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
|
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,7 +185,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
uint64_t uid = p->pTableList[i].uid;
|
uint64_t uid = p->pTableList[i].uid;
|
||||||
p->uidList[i] = uid;
|
p->uidList[i] = uid;
|
||||||
STableLoadInfo* pInfo = taosMemoryMalloc(sizeof(STableLoadInfo));
|
STableLoadInfo* pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
|
||||||
tSimpleHashPut(p->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
|
tSimpleHashPut(p->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue