Merge pull request #19919 from taosdata/fix/liaohj
fix(query): add check for table/super table dropped
This commit is contained in:
commit
1b53f4b0bd
|
@ -99,6 +99,38 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
|
||||
int32_t numOfTables = p->numOfTables;
|
||||
|
||||
if (suid != 0) {
|
||||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
|
||||
if (p->pSchema == NULL) {
|
||||
taosMemoryFree(p);
|
||||
tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
|
||||
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
uint64_t uid = p->pTableList[i].uid;
|
||||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
|
||||
if (p->pSchema != NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
|
||||
}
|
||||
|
||||
// all queried tables have been dropped already, return immediately.
|
||||
if (p->pSchema == NULL) {
|
||||
taosMemoryFree(p);
|
||||
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
|
||||
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
uint64_t suid, void** pReader, const char* idstr) {
|
||||
*pReader = NULL;
|
||||
|
@ -117,11 +149,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
|
||||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
|
||||
p->pTableList = pTableIdList;
|
||||
p->numOfTables = numOfTables;
|
||||
|
||||
int32_t code = setTableSchema(p, suid, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbCacherowsReaderClose(p);
|
||||
return code;
|
||||
}
|
||||
|
||||
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
|
||||
if (p->transferBuf == NULL) {
|
||||
tsdbCacherowsReaderClose(p);
|
||||
|
|
|
@ -215,8 +215,15 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
|
||||
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
|
||||
pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pInfo->currentGroupIndex += 1;
|
||||
taosArrayClear(pInfo->pUidList);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosArrayClear(pInfo->pUidList);
|
||||
|
||||
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
|
||||
|
|
Loading…
Reference in New Issue