fix(query):check return code
This commit is contained in:
parent
1ea00597f3
commit
59d190508d
|
@ -355,7 +355,7 @@ void tsdbCacherowsReaderClose(void* pReader) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p->pSchema != NULL) {
|
if (p->pSchema != NULL && p->transferBuf != NULL) {
|
||||||
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
|
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
|
||||||
taosMemoryFreeClear(p->transferBuf[i]);
|
taosMemoryFreeClear(p->transferBuf[i]);
|
||||||
}
|
}
|
||||||
|
@ -450,10 +450,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
bool hasRes = false;
|
||||||
SCacheRowsReader* pr = pReader;
|
SCacheRowsReader* pr = pReader;
|
||||||
|
pr->pReadSnap = NULL;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
bool hasRes = false;
|
|
||||||
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
|
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
|
||||||
if (pRow == NULL) {
|
if (pRow == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -690,6 +691,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
|
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
|
||||||
|
pr->pReadSnap = NULL;
|
||||||
|
|
||||||
if (pr->pCurFileSet) {
|
if (pr->pCurFileSet) {
|
||||||
pr->pCurFileSet = NULL;
|
pr->pCurFileSet = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4795,6 +4795,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
||||||
void* p = pReader->pReadSnap;
|
void* p = pReader->pReadSnap;
|
||||||
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
||||||
tsdbUntakeReadSnap2(pReader, p, true);
|
tsdbUntakeReadSnap2(pReader, p, true);
|
||||||
|
pReader->pReadSnap = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void) tsem_destroy(&pReader->resumeAfterSuspend);
|
(void) tsem_destroy(&pReader->resumeAfterSuspend);
|
||||||
|
@ -4877,6 +4878,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
||||||
void* p = pReader->pReadSnap;
|
void* p = pReader->pReadSnap;
|
||||||
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
||||||
tsdbUntakeReadSnap2(pReader, p, false);
|
tsdbUntakeReadSnap2(pReader, p, false);
|
||||||
|
pReader->pReadSnap = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->bFilesetDelimited) {
|
if (pReader->bFilesetDelimited) {
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
|
|
||||||
#define T_LONG_JMP(_obj, _c) \
|
#define T_LONG_JMP(_obj, _c) \
|
||||||
do { \
|
do { \
|
||||||
ASSERT((_c) != -1); \
|
ASSERT((_obj) != 0); \
|
||||||
longjmp((_obj), (_c)); \
|
longjmp((_obj), (_c)); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -615,8 +615,12 @@ _error:
|
||||||
if (pInfo != NULL) {
|
if (pInfo != NULL) {
|
||||||
destroyGroupOperatorInfo(pInfo);
|
destroyGroupOperatorInfo(pInfo);
|
||||||
}
|
}
|
||||||
destroyOperator(pOperator);
|
|
||||||
taosMemoryFreeClear(pOperator);
|
if (pOperator) {
|
||||||
|
pOperator->info = NULL;
|
||||||
|
destroyOperator(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -155,6 +155,11 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
||||||
|
|
||||||
schemaInfo.tablename = taosStrdup(mr.me.name);
|
schemaInfo.tablename = taosStrdup(mr.me.name);
|
||||||
schemaInfo.dbname = taosStrdup(dbName);
|
schemaInfo.dbname = taosStrdup(dbName);
|
||||||
|
if (schemaInfo.tablename == NULL || schemaInfo.dbname == NULL) {
|
||||||
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
if (mr.me.type == TSDB_SUPER_TABLE) {
|
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||||
|
@ -166,8 +171,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
||||||
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid);
|
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pAPI->metaReaderFn.clearReader(&mr);
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
taosMemoryFree(schemaInfo.tablename);
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
taosMemoryFree(schemaInfo.dbname);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,18 +181,26 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
||||||
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
|
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
|
|
||||||
if (schemaInfo.sw == NULL) {
|
if (schemaInfo.sw == NULL) {
|
||||||
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pAPI->metaReaderFn.clearReader(&mr);
|
|
||||||
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
|
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
|
||||||
if (schemaInfo.qsw == NULL) {
|
if (schemaInfo.qsw == NULL) {
|
||||||
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
|
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
|
||||||
return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY;
|
if (p == NULL) {
|
||||||
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
||||||
|
|
|
@ -235,7 +235,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
||||||
void* newRet = NULL;
|
void* newRet = NULL;
|
||||||
int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
|
int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
|
||||||
if (newRet == NULL) {
|
if (newRet == NULL) {
|
||||||
if (code) {
|
if (code != -1) {
|
||||||
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue