Merge pull request #27488 from taosdata/fix/3_liaohj

fix(query): fix reader not release when error occuring.
This commit is contained in:
Haojun Liao 2024-08-27 18:41:14 +08:00 committed by GitHub
commit c950b684a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 185 additions and 146 deletions

View File

@ -4586,6 +4586,8 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
(void)tsdbAcquireReader(pReader);
while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*p);
}
@ -4593,12 +4595,14 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
if (size < num) {
code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num);
if (code) {
(void) tsdbReleaseReader(pReader);
return code;
}
char* p1 = taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
if (p1 == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
(void) tsdbReleaseReader(pReader);
return terrno;
}
pReader->status.uidList.tableUidList = (uint64_t*)p1;
@ -4615,16 +4619,19 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
STableBlockScanInfo* pInfo = NULL;
code = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
(void) tsdbReleaseReader(pReader);
return code;
}
code = initTableBlockScanInfo(pInfo, pList[i].uid, pReader->status.pTableMap, pReader);
if (code != TSDB_CODE_SUCCESS) {
(void) tsdbReleaseReader(pReader);
return code;
}
}
return TDB_CODE_SUCCESS;
(void) tsdbReleaseReader(pReader);
return code;
}
uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->info.verRange.maxVer; }

View File

@ -419,7 +419,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
size_t size = taosArrayGetSize(pBlock->pDataBlock);
bool keep = false;
code = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows, &keep);
QUERY_CHECK_CODE(code, lino, _end);
if (code) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
qError("%s failed to do filter by block sma, code:%s", GET_TASKID(pTaskInfo), tstrerror(code));
QUERY_CHECK_CODE(code, lino, _end);
}
if (!keep) {
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
@ -6502,8 +6506,8 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
setOperatorCompleted(pOperator);
}

View File

@ -1354,6 +1354,171 @@ _end:
return code;
}
static int32_t doSetUserTableMetaInfo(SStoreMetaReader* pMetaReaderFn, SStoreMeta* pMetaFn, void* pVnode,
SMetaReader* pMReader, int64_t uid, const char* dbname, int32_t vgId,
SSDataBlock* p, int32_t rowIndex, const char* idStr) {
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t lino = 0;
int32_t code = pMetaReaderFn->getTableEntryByUid(pMReader, uid);
if (code < 0) {
qError("failed to get table meta, uid:%" PRId64 ", code:%s, %s", uid, tstrerror(terrno), idStr);
return code;
}
STR_TO_VARSTR(n, pMReader->me.name);
// table name
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, n, false);
QUERY_CHECK_CODE(code, lino, _end);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);
// vgId
pColInfoData = taosArrayGet(p->pDataBlock, 6);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, (char*)&vgId, false);
QUERY_CHECK_CODE(code, lino, _end);
int32_t tableType = pMReader->me.type;
if (tableType == TSDB_CHILD_TABLE) {
// create time
int64_t ts = pMReader->me.ctbEntry.btime;
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, (char*)&ts, false);
QUERY_CHECK_CODE(code, lino, _end);
SMetaReader mr1 = {0};
pMetaReaderFn->initReader(&mr1, pVnode, META_READER_NOLOCK, pMetaFn);
int64_t suid = pMReader->me.ctbEntry.suid;
code = pMetaReaderFn->getTableEntryByUid(&mr1, suid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", pMReader->me.name, suid,
tstrerror(code), idStr);
pMetaReaderFn->clearReader(&mr1);
QUERY_CHECK_CODE(code, lino, _end);
}
pColInfoData = taosArrayGet(p->pDataBlock, 3);
if (pColInfoData == NULL) {
pMetaReaderFn->clearReader(&mr1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
}
code = colDataSetVal(pColInfoData, rowIndex, (char*)&mr1.me.stbEntry.schemaRow.nCols, false);
if (code != 0) {
pMetaReaderFn->clearReader(&mr1);
QUERY_CHECK_CODE(code, lino, _end);
}
// super table name
STR_TO_VARSTR(n, mr1.me.name);
pColInfoData = taosArrayGet(p->pDataBlock, 4);
if (pColInfoData == NULL) {
pMetaReaderFn->clearReader(&mr1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
}
code = colDataSetVal(pColInfoData, rowIndex, n, false);
pMetaReaderFn->clearReader(&mr1);
QUERY_CHECK_CODE(code, lino, _end);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
if (pMReader->me.ctbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pMReader->me.ctbEntry.comment);
code = colDataSetVal(pColInfoData, rowIndex, comment, false);
QUERY_CHECK_CODE(code, lino, _end);
} else if (pMReader->me.ctbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
code = colDataSetVal(pColInfoData, rowIndex, comment, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
colDataSetNULL(pColInfoData, rowIndex);
}
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, (char*)&pMReader->me.uid, false);
QUERY_CHECK_CODE(code, lino, _end);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, (char*)&pMReader->me.ctbEntry.ttlDays, false);
QUERY_CHECK_CODE(code, lino, _end);
STR_TO_VARSTR(n, "CHILD_TABLE");
} else if (tableType == TSDB_NORMAL_TABLE) {
// create time
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, (char*)&pMReader->me.ntbEntry.btime, false);
QUERY_CHECK_CODE(code, lino, _end);
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, (char*)&pMReader->me.ntbEntry.schemaRow.nCols, false);
QUERY_CHECK_CODE(code, lino, _end);
// super table name
pColInfoData = taosArrayGet(p->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
colDataSetNULL(pColInfoData, rowIndex);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
if (pMReader->me.ntbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pMReader->me.ntbEntry.comment);
code = colDataSetVal(pColInfoData, rowIndex, comment, false);
QUERY_CHECK_CODE(code, lino, _end);
} else if (pMReader->me.ntbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
code = colDataSetVal(pColInfoData, rowIndex, comment, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
colDataSetNULL(pColInfoData, rowIndex);
}
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, (char*)&pMReader->me.uid, false);
QUERY_CHECK_CODE(code, lino, _end);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, rowIndex, (char*)&pMReader->me.ntbEntry.ttlDays, false);
QUERY_CHECK_CODE(code, lino, _end);
STR_TO_VARSTR(n, "NORMAL_TABLE");
// impl later
}
_end:
qError("%s failed at line %d since %s, %s", __func__, lino, tstrerror(code), idStr);
return code;
}
static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1395,152 +1560,15 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
ret = pAPI->metaReaderFn.getTableEntryByUid(&mr, *uid);
if (ret < 0) {
pAPI->metaReaderFn.clearReader(&mr);
continue;
}
STR_TO_VARSTR(n, mr.me.name);
// table name
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);
// vgId
pColInfoData = taosArrayGet(p->pDataBlock, 6);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&vgId, false);
QUERY_CHECK_CODE(code, lino, _end);
int32_t tableType = mr.me.type;
if (tableType == TSDB_CHILD_TABLE) {
// create time
int64_t ts = mr.me.ctbEntry.btime;
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
QUERY_CHECK_CODE(code, lino, _end);
SMetaReader mr1 = {0};
pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
int64_t suid = mr.me.ctbEntry.suid;
code = pAPI->metaReaderFn.getTableEntryByUid(&mr1, suid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", mr.me.name,
suid, tstrerror(terrno), GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(&mr1);
pAPI->metaReaderFn.clearReader(&mr);
T_LONG_JMP(pTaskInfo->env, terrno);
}
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr1.me.stbEntry.schemaRow.nCols, false);
QUERY_CHECK_CODE(code, lino, _end);
// super table name
STR_TO_VARSTR(n, mr1.me.name);
pColInfoData = taosArrayGet(p->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);
pAPI->metaReaderFn.clearReader(&mr1);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
if (mr.me.ctbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, mr.me.ctbEntry.comment);
code = colDataSetVal(pColInfoData, numOfRows, comment, false);
QUERY_CHECK_CODE(code, lino, _end);
} else if (mr.me.ctbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
code = colDataSetVal(pColInfoData, numOfRows, comment, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
colDataSetNULL(pColInfoData, numOfRows);
}
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.uid, false);
QUERY_CHECK_CODE(code, lino, _end);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.ctbEntry.ttlDays, false);
QUERY_CHECK_CODE(code, lino, _end);
STR_TO_VARSTR(n, "CHILD_TABLE");
} else if (tableType == TSDB_NORMAL_TABLE) {
// create time
pColInfoData = taosArrayGet(p->pDataBlock, 2);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.ntbEntry.btime, false);
QUERY_CHECK_CODE(code, lino, _end);
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.ntbEntry.schemaRow.nCols, false);
QUERY_CHECK_CODE(code, lino, _end);
// super table name
pColInfoData = taosArrayGet(p->pDataBlock, 4);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
colDataSetNULL(pColInfoData, numOfRows);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
if (mr.me.ntbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, mr.me.ntbEntry.comment);
code = colDataSetVal(pColInfoData, numOfRows, comment, false);
QUERY_CHECK_CODE(code, lino, _end);
} else if (mr.me.ntbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
code = colDataSetVal(pColInfoData, numOfRows, comment, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
colDataSetNULL(pColInfoData, numOfRows);
}
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.uid, false);
QUERY_CHECK_CODE(code, lino, _end);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.ntbEntry.ttlDays, false);
QUERY_CHECK_CODE(code, lino, _end);
STR_TO_VARSTR(n, "NORMAL_TABLE");
// impl later
}
code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId, p,
numOfRows, GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(&mr);
QUERY_CHECK_CODE(code, lino, _end);
pColInfoData = taosArrayGet(p->pDataBlock, 9);
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 9);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, n, false);
QUERY_CHECK_CODE(code, lino, _end);