Merge pull request #27012 from taosdata/fix/syntax
fix(query):check return code
This commit is contained in:
commit
a8eedcfdc6
|
@ -355,7 +355,7 @@ void tsdbCacherowsReaderClose(void* pReader) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p->pSchema != NULL) {
|
if (p->pSchema != NULL && p->transferBuf != NULL) {
|
||||||
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
|
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
|
||||||
taosMemoryFreeClear(p->transferBuf[i]);
|
taosMemoryFreeClear(p->transferBuf[i]);
|
||||||
}
|
}
|
||||||
|
@ -450,23 +450,27 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
bool hasRes = false;
|
||||||
|
SArray* pRow = NULL;
|
||||||
|
void** pRes = NULL;
|
||||||
SCacheRowsReader* pr = pReader;
|
SCacheRowsReader* pr = pReader;
|
||||||
|
int32_t pkBufLen = 0;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
pr->pReadSnap = NULL;
|
||||||
bool hasRes = false;
|
pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
|
||||||
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
|
|
||||||
if (pRow == NULL) {
|
if (pRow == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
|
pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
|
||||||
if (pRes == NULL) {
|
if (pRes == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0;
|
pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0;
|
||||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||||
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;
|
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;
|
||||||
|
|
||||||
|
@ -690,6 +694,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
|
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
|
||||||
|
pr->pReadSnap = NULL;
|
||||||
|
|
||||||
if (pr->pCurFileSet) {
|
if (pr->pCurFileSet) {
|
||||||
pr->pCurFileSet = NULL;
|
pr->pCurFileSet = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2434,21 +2434,25 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
|
|
||||||
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
|
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
|
||||||
if (info.pKeyRangeList == NULL) {
|
if (info.pKeyRangeList == NULL) {
|
||||||
|
pReader->code = terrno;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pReader->code = code;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = initMemDataIterator(pScanInfo, pReader);
|
code = initMemDataIterator(pScanInfo, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pReader->code = code;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
|
code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pReader->code = code;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2461,7 +2465,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) {
|
||||||
SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i);
|
SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i);
|
||||||
if (pKeyRange == NULL) {
|
if (pKeyRange == NULL) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(&pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) {
|
if (pkCompEx(&pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) {
|
||||||
|
@ -2766,6 +2770,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
(void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
|
(void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
|
||||||
|
if (pReader->code != 0) {
|
||||||
|
code = pReader->code;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
bool hasBlockData = false;
|
bool hasBlockData = false;
|
||||||
|
@ -3180,6 +3188,10 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
|
return pReader->code;
|
||||||
|
}
|
||||||
|
|
||||||
if (!hasDataInSttFile) {
|
if (!hasDataInSttFile) {
|
||||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
if (!hasNexTable) {
|
if (!hasNexTable) {
|
||||||
|
@ -3273,6 +3285,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
|
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
|
||||||
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
|
if (pReader->code != 0) {
|
||||||
|
return pReader->code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
||||||
|
@ -3314,6 +3329,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
||||||
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
|
if (pReader->code != 0) {
|
||||||
|
return pReader->code;
|
||||||
|
}
|
||||||
|
|
||||||
// no data in stt block, no need to proceed.
|
// no data in stt block, no need to proceed.
|
||||||
while (hasDataInSttBlock(pScanInfo)) {
|
while (hasDataInSttBlock(pScanInfo)) {
|
||||||
|
@ -4795,6 +4813,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
||||||
void* p = pReader->pReadSnap;
|
void* p = pReader->pReadSnap;
|
||||||
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
||||||
tsdbUntakeReadSnap2(pReader, p, true);
|
tsdbUntakeReadSnap2(pReader, p, true);
|
||||||
|
pReader->pReadSnap = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void) tsem_destroy(&pReader->resumeAfterSuspend);
|
(void) tsem_destroy(&pReader->resumeAfterSuspend);
|
||||||
|
@ -4877,6 +4896,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
||||||
void* p = pReader->pReadSnap;
|
void* p = pReader->pReadSnap;
|
||||||
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
||||||
tsdbUntakeReadSnap2(pReader, p, false);
|
tsdbUntakeReadSnap2(pReader, p, false);
|
||||||
|
pReader->pReadSnap = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->bFilesetDelimited) {
|
if (pReader->bFilesetDelimited) {
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
|
|
||||||
#define T_LONG_JMP(_obj, _c) \
|
#define T_LONG_JMP(_obj, _c) \
|
||||||
do { \
|
do { \
|
||||||
ASSERT((_c) != -1); \
|
ASSERT(1); \
|
||||||
longjmp((_obj), (_c)); \
|
longjmp((_obj), (_c)); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -180,11 +180,11 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||||
|
|
||||||
if (pOperator->blocking && pAggInfo->hasValidBlock) return false;
|
if (pOperator->blocking && pAggInfo->hasValidBlock) {
|
||||||
|
return false;
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
}
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
||||||
|
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
int32_t order = pAggInfo->binfo.inputTsOrder;
|
int32_t order = pAggInfo->binfo.inputTsOrder;
|
||||||
SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
|
SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
|
||||||
|
@ -458,7 +458,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
|
||||||
* not assign result buffer yet, add new result buffer
|
* not assign result buffer yet, add new result buffer
|
||||||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||||
*/
|
*/
|
||||||
if (pResultRow->pageId == -1) {
|
if (pResultRow == NULL || pResultRow->pageId == -1) {
|
||||||
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
|
int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
|
|
|
@ -158,8 +158,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
if (isIntervalQuery) {
|
if (isIntervalQuery) {
|
||||||
if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
|
if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
|
||||||
pResult = getResultRowByPos(pResultBuf, p1, true);
|
pResult = getResultRowByPos(pResultBuf, p1, true);
|
||||||
if (NULL == pResult) {
|
if (pResult == NULL) {
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
pTaskInfo->code = terrno;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
|
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
|
||||||
|
@ -171,7 +172,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
// todo
|
// todo
|
||||||
pResult = getResultRowByPos(pResultBuf, p1, true);
|
pResult = getResultRowByPos(pResultBuf, p1, true);
|
||||||
if (NULL == pResult) {
|
if (NULL == pResult) {
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
pTaskInfo->code = terrno;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
|
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
|
||||||
|
@ -184,7 +186,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
pTaskInfo->code = terrno;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
releaseBufPage(pResultBuf, pPage);
|
releaseBufPage(pResultBuf, pPage);
|
||||||
}
|
}
|
||||||
|
@ -193,7 +196,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
if (pResult == NULL) {
|
if (pResult == NULL) {
|
||||||
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
|
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
|
||||||
if (pResult == NULL) {
|
if (pResult == NULL) {
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
pTaskInfo->code = terrno;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// add a new result set for a new group
|
// add a new result set for a new group
|
||||||
|
@ -202,7 +206,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
sizeof(SResultRowPosition));
|
sizeof(SResultRowPosition));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
pTaskInfo->code = code;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,7 +217,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
// too many time window in query
|
// too many time window in query
|
||||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
|
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
|
||||||
tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
|
tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pResult;
|
return pResult;
|
||||||
|
|
|
@ -615,8 +615,12 @@ _error:
|
||||||
if (pInfo != NULL) {
|
if (pInfo != NULL) {
|
||||||
destroyGroupOperatorInfo(pInfo);
|
destroyGroupOperatorInfo(pInfo);
|
||||||
}
|
}
|
||||||
destroyOperator(pOperator);
|
|
||||||
taosMemoryFreeClear(pOperator);
|
if (pOperator) {
|
||||||
|
pOperator->info = NULL;
|
||||||
|
destroyOperator(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1254,6 +1258,9 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
|
||||||
|
|
||||||
SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo,
|
SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo,
|
||||||
false, pAggSup, false);
|
false, pAggSup, false);
|
||||||
|
if (pResultRow == NULL || pTaskInfo->code != 0) {
|
||||||
|
return pTaskInfo->code;
|
||||||
|
}
|
||||||
|
|
||||||
return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
|
return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
|
@ -703,6 +703,9 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
|
||||||
int64_t groupId = 0;
|
int64_t groupId = 0;
|
||||||
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
|
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
|
||||||
pTaskInfo, false, pSup, true);
|
pTaskInfo, false, pSup, true);
|
||||||
|
if (pRow == NULL || pTaskInfo->code != 0) {
|
||||||
|
return pTaskInfo->code;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||||
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
|
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
|
||||||
|
|
|
@ -155,6 +155,11 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
||||||
|
|
||||||
schemaInfo.tablename = taosStrdup(mr.me.name);
|
schemaInfo.tablename = taosStrdup(mr.me.name);
|
||||||
schemaInfo.dbname = taosStrdup(dbName);
|
schemaInfo.dbname = taosStrdup(dbName);
|
||||||
|
if (schemaInfo.tablename == NULL || schemaInfo.dbname == NULL) {
|
||||||
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
if (mr.me.type == TSDB_SUPER_TABLE) {
|
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||||
|
@ -166,8 +171,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
||||||
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid);
|
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pAPI->metaReaderFn.clearReader(&mr);
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
taosMemoryFree(schemaInfo.tablename);
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
taosMemoryFree(schemaInfo.dbname);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,18 +181,26 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
||||||
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
|
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
|
|
||||||
if (schemaInfo.sw == NULL) {
|
if (schemaInfo.sw == NULL) {
|
||||||
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pAPI->metaReaderFn.clearReader(&mr);
|
|
||||||
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
|
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
|
||||||
if (schemaInfo.qsw == NULL) {
|
if (schemaInfo.qsw == NULL) {
|
||||||
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
|
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
|
||||||
return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY;
|
if (p == NULL) {
|
||||||
|
cleanupQueriedTableScanInfo(&schemaInfo);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
||||||
|
|
|
@ -130,7 +130,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
||||||
pGroupIdCalc->lastKeysLen = 0;
|
pGroupIdCalc->lastKeysLen = 0;
|
||||||
pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
|
pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
|
||||||
if (!pGroupIdCalc->keyBuf) {
|
if (!pGroupIdCalc->keyBuf) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -370,8 +370,13 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
||||||
|
|
||||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
|
if (ps == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
ps->param = pOperator->pDownstream[0];
|
ps->param = pOperator->pDownstream[0];
|
||||||
ps->onlyRef = true;
|
ps->onlyRef = true;
|
||||||
|
|
||||||
code = tsortAddSource(pInfo->pSortHandle, ps);
|
code = tsortAddSource(pInfo->pSortHandle, ps);
|
||||||
if (code) {
|
if (code) {
|
||||||
taosMemoryFree(ps);
|
taosMemoryFree(ps);
|
||||||
|
@ -464,6 +469,9 @@ void destroySortOperatorInfo(void* param) {
|
||||||
|
|
||||||
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||||
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
|
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
|
||||||
|
if (pInfo == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
|
SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
|
||||||
|
|
||||||
|
@ -638,6 +646,10 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
|
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
|
||||||
|
if (ps == NULL || param == NULL) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
param->childOpInfo = pOperator->pDownstream[0];
|
param->childOpInfo = pOperator->pDownstream[0];
|
||||||
param->grpSortOpInfo = pInfo;
|
param->grpSortOpInfo = pInfo;
|
||||||
ps->param = param;
|
ps->param = param;
|
||||||
|
|
|
@ -75,9 +75,9 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
|
||||||
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
|
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
|
||||||
masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
|
masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
|
||||||
|
|
||||||
if (pResultRow == NULL) {
|
if (pResultRow == NULL || pTaskInfo->code != 0) {
|
||||||
*pResult = NULL;
|
*pResult = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set time window for current result
|
// set time window for current result
|
||||||
|
|
|
@ -235,7 +235,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
||||||
void* newRet = NULL;
|
void* newRet = NULL;
|
||||||
int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
|
int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
|
||||||
if (newRet == NULL) {
|
if (newRet == NULL) {
|
||||||
if (code) {
|
if (code != -1) {
|
||||||
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue