cache/row iter close: move close to cleanup section
This commit is contained in:
parent
270e9f0bcf
commit
65698470b7
|
@ -616,6 +616,7 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) {
|
||||||
int32_t sver;
|
int32_t sver;
|
||||||
int32_t nCol;
|
int32_t nCol;
|
||||||
SArray *ctxArray = pTsdb->rCache.ctxArray;
|
SArray *ctxArray = pTsdb->rCache.ctxArray;
|
||||||
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
|
|
||||||
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
|
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
|
||||||
|
|
||||||
|
@ -640,7 +641,6 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) {
|
||||||
pTSchema = pTsdb->rCache.pTSchema;
|
pTSchema = pTsdb->rCache.pTSchema;
|
||||||
nCol = pTSchema->numOfCols;
|
nCol = pTSchema->numOfCols;
|
||||||
|
|
||||||
STsdbRowKey tsdbRowKey = {0};
|
|
||||||
tsdbRowGetKey(pMemRow, &tsdbRowKey);
|
tsdbRowGetKey(pMemRow, &tsdbRowKey);
|
||||||
|
|
||||||
STSDBRowIter iter = {0};
|
STSDBRowIter iter = {0};
|
||||||
|
@ -650,32 +650,27 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) {
|
||||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
|
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
|
||||||
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
||||||
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
||||||
tsdbRowClose(&iter);
|
|
||||||
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||||
updateCtx.lflag = LFLAG_LAST;
|
updateCtx.lflag = LFLAG_LAST;
|
||||||
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
||||||
tsdbRowClose(&iter);
|
|
||||||
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!iColHash) {
|
if (!iColHash) {
|
||||||
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||||
if (iColHash == NULL) {
|
if (iColHash == NULL) {
|
||||||
tsdbRowClose(&iter);
|
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
|
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
|
||||||
tsdbRowClose(&iter);
|
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tsdbRowClose(&iter);
|
|
||||||
|
|
||||||
// continue to get next row to fill null last col values
|
// continue to get next row to fill null last col values
|
||||||
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
|
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
|
||||||
|
@ -717,6 +712,7 @@ _exit:
|
||||||
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbRowClose(&iter);
|
||||||
taosArrayClear(ctxArray);
|
taosArrayClear(ctxArray);
|
||||||
// destroy any allocated resource
|
// destroy any allocated resource
|
||||||
tSimpleHashCleanup(iColHash);
|
tSimpleHashCleanup(iColHash);
|
||||||
|
@ -1598,10 +1594,11 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
|
||||||
int32_t code = 0, lino = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
|
||||||
// 1. prepare last
|
// 1. prepare last
|
||||||
TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
|
TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
|
||||||
STSchema *pTSchema = NULL;
|
STSchema *pTSchema = NULL;
|
||||||
int32_t sver = TSDBROW_SVERSION(&lRow);
|
int32_t sver = TSDBROW_SVERSION(&lRow);
|
||||||
SSHashObj *iColHash = NULL;
|
SSHashObj *iColHash = NULL;
|
||||||
|
STSDBRowIter iter = {0};
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
|
TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
|
||||||
pTSchema = pTsdb->rCache.pTSchema;
|
pTSchema = pTsdb->rCache.pTSchema;
|
||||||
|
@ -1614,39 +1611,33 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
|
||||||
STsdbRowKey tsdbRowKey = {0};
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
tsdbRowGetKey(&lRow, &tsdbRowKey);
|
tsdbRowGetKey(&lRow, &tsdbRowKey);
|
||||||
|
|
||||||
STSDBRowIter iter = {0};
|
|
||||||
TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
|
TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
|
||||||
|
|
||||||
int32_t iCol = 0;
|
int32_t iCol = 0;
|
||||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
|
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
|
||||||
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
||||||
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
||||||
tsdbRowClose(&iter);
|
|
||||||
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||||
updateCtx.lflag = LFLAG_LAST;
|
updateCtx.lflag = LFLAG_LAST;
|
||||||
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
||||||
tsdbRowClose(&iter);
|
|
||||||
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!iColHash) {
|
if (!iColHash) {
|
||||||
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||||
if (iColHash == NULL) {
|
if (iColHash == NULL) {
|
||||||
tsdbRowClose(&iter);
|
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
|
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
|
||||||
tsdbRowClose(&iter);
|
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tsdbRowClose(&iter);
|
|
||||||
|
|
||||||
// 2. prepare by the other rows
|
// 2. prepare by the other rows
|
||||||
for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
|
for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
|
||||||
|
@ -1692,6 +1683,7 @@ _exit:
|
||||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbRowClose(&iter);
|
||||||
tSimpleHashCleanup(iColHash);
|
tSimpleHashCleanup(iColHash);
|
||||||
taosArrayClear(ctxArray);
|
taosArrayClear(ctxArray);
|
||||||
|
|
||||||
|
@ -1699,7 +1691,8 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
|
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
|
||||||
int32_t code = 0, lino = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
STSDBRowIter iter = {0};
|
||||||
|
|
||||||
TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
|
TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
|
||||||
|
|
||||||
|
@ -1756,7 +1749,6 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. prepare last row
|
// 2. prepare last row
|
||||||
STSDBRowIter iter = {0};
|
|
||||||
code = tsdbRowIterOpen(&iter, &lRow, pTSchema);
|
code = tsdbRowIterOpen(&iter, &lRow, pTSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||||
|
@ -1769,7 +1761,6 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
|
||||||
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tsdbRowClose(&iter);
|
|
||||||
|
|
||||||
// 3. do update
|
// 3. do update
|
||||||
code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||||
|
@ -1779,6 +1770,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
tsdbRowClose(&iter);
|
||||||
taosMemoryFreeClear(pTSchema);
|
taosMemoryFreeClear(pTSchema);
|
||||||
taosArrayDestroy(ctxArray);
|
taosArrayDestroy(ctxArray);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue