Merge pull request #27289 from taosdata/fix/3_liaohj
fix(query): return error code.
This commit is contained in:
commit
17c941fc32
|
@ -898,7 +898,7 @@ typedef struct SSttDataInfoForTable {
|
||||||
|
|
||||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo);
|
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo);
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
int32_t tMergeTreeNext(SMergeTree *pMTree, bool* pHasNext);
|
||||||
void tMergeTreePinSttBlock(SMergeTree *pMTree);
|
void tMergeTreePinSttBlock(SMergeTree *pMTree);
|
||||||
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
|
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
|
||||||
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
||||||
|
|
|
@ -2245,17 +2245,20 @@ static int32_t lastIterClose(SFSLastIter **iter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
|
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
|
||||||
int32_t code = 0;
|
bool hasVal = false;
|
||||||
|
*ppRow = NULL;
|
||||||
|
|
||||||
|
int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
bool hasVal = tMergeTreeNext(iter->pMergeTree);
|
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
*ppRow = NULL;
|
*ppRow = NULL;
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppRow = tMergeTreeGetRow(iter->pMergeTree);
|
*ppRow = tMergeTreeGetRow(iter->pMergeTree);
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1100,13 +1100,22 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) {
|
||||||
tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
|
tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree) {
|
int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) {
|
||||||
|
int32_t code = 0;
|
||||||
|
if (pHasNext == NULL) {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pHasNext = false;
|
||||||
if (pMTree->pIter) {
|
if (pMTree->pIter) {
|
||||||
SLDataIter *pIter = pMTree->pIter;
|
SLDataIter *pIter = pMTree->pIter;
|
||||||
|
bool hasVal = false;
|
||||||
bool hasVal = false;
|
code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
|
||||||
int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
|
|
||||||
if (!hasVal || (code != 0)) {
|
if (!hasVal || (code != 0)) {
|
||||||
|
if (code == TSDB_CODE_FILE_CORRUPTED) {
|
||||||
|
code = 0; // suppress the file corrupt error to enable all queries within this cluster can run without failed.
|
||||||
|
}
|
||||||
|
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1115,7 +1124,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
|
||||||
if (pMTree->pIter && pIter) {
|
if (pMTree->pIter && pIter) {
|
||||||
int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
|
int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
|
||||||
if (c > 0) {
|
if (c > 0) {
|
||||||
(void) tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
|
(void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(c);
|
ASSERT(c);
|
||||||
|
@ -1130,7 +1139,8 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return pMTree->pIter != NULL;
|
*pHasNext = (pMTree->pIter != NULL);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tMergeTreeClose(SMergeTree *pMTree) {
|
void tMergeTreeClose(SMergeTree *pMTree) {
|
||||||
|
|
|
@ -1383,7 +1383,6 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
|
||||||
|
|
||||||
static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) {
|
static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) {
|
||||||
ASSERT(pReader->info.pSchema == NULL);
|
ASSERT(pReader->info.pSchema == NULL);
|
||||||
|
|
||||||
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, -1, &pReader->info.pSchema);
|
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, -1, &pReader->info.pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS || pReader->info.pSchema == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pReader->info.pSchema == NULL) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
@ -1414,7 +1413,9 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
||||||
if (pReader->info.pSchema == NULL) {
|
if (pReader->info.pSchema == NULL) {
|
||||||
pSchema = getTableSchemaImpl(pReader, uid);
|
pSchema = getTableSchemaImpl(pReader, uid);
|
||||||
if (pSchema == NULL) {
|
if (pSchema == NULL) {
|
||||||
tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
|
code = terrno;
|
||||||
|
tsdbError("%p table uid:%" PRIu64 " failed to get tableschema, code:%s, %s", pReader, uid, tstrerror(code),
|
||||||
|
pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1449,7 +1450,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
||||||
pReader->cost.blockLoadTime += elapsedTime;
|
pReader->cost.blockLoadTime += elapsedTime;
|
||||||
pDumpInfo->allDumped = false;
|
pDumpInfo->allDumped = false;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1759,14 +1760,22 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot,
|
static int32_t nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot,
|
||||||
SVersionRange* pVerRange) {
|
SVersionRange* pVerRange) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t order = pSttBlockReader->order;
|
int32_t order = pSttBlockReader->order;
|
||||||
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
||||||
SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey;
|
SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
|
bool hasVal = false;
|
||||||
|
code = tMergeTreeNext(&pSttBlockReader->mergeTree, &hasVal);
|
||||||
|
if (code) {
|
||||||
|
tsdbError("failed to iter the next row in stt-file merge tree, code:%s, %s", tstrerror(code),
|
||||||
|
pSttBlockReader->mergeTree.idStr);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (!hasVal) { // the next value will be the accessed key in stt
|
if (!hasVal) { // the next value will be the accessed key in stt
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||||
|
|
||||||
|
@ -1779,7 +1788,7 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
||||||
memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData);
|
memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
|
@ -1798,13 +1807,15 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
||||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange,
|
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange,
|
||||||
pSttBlockReader->numOfPks > 0)) {
|
pSttBlockReader->numOfPks > 0)) {
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
||||||
return true;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
||||||
return true;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); }
|
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); }
|
||||||
|
@ -1819,9 +1830,14 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
|
||||||
|
|
||||||
// avoid the fetch next row replace the referenced stt block in buffer
|
// avoid the fetch next row replace the referenced stt block in buffer
|
||||||
doPinSttBlock(pSttBlockReader);
|
doPinSttBlock(pSttBlockReader);
|
||||||
bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
||||||
doUnpinSttBlock(pSttBlockReader);
|
doUnpinSttBlock(pSttBlockReader);
|
||||||
if (hasVal) {
|
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasDataInSttBlock(pScanInfo)) {
|
||||||
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
|
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if (pkCompEx(pSttKey, pNext) != 0) {
|
if (pkCompEx(pSttKey, pNext) != 0) {
|
||||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
|
@ -2125,7 +2141,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (piRow->type == TSDBROW_ROW_FMT) {
|
if (piRow->type == TSDBROW_ROW_FMT) {
|
||||||
piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||||
if (piSchema == NULL) {
|
if (piSchema == NULL) {
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2380,14 +2396,13 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
static void initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||||
bool hasData = true;
|
|
||||||
int32_t order = pReader->info.order;
|
int32_t order = pReader->info.order;
|
||||||
bool asc = ASCENDING_TRAVERSE(order);
|
bool asc = ASCENDING_TRAVERSE(order);
|
||||||
|
|
||||||
// the stt block reader has been initialized for this table.
|
// the stt block reader has been initialized for this table.
|
||||||
if (pSttBlockReader->uid == pScanInfo->uid) {
|
if (pSttBlockReader->uid == pScanInfo->uid) {
|
||||||
return hasDataInSttBlock(pScanInfo);
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSttBlockReader->uid != 0) {
|
if (pSttBlockReader->uid != 0) {
|
||||||
|
@ -2396,9 +2411,14 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
|
|
||||||
pSttBlockReader->uid = pScanInfo->uid;
|
pSttBlockReader->uid = pScanInfo->uid;
|
||||||
|
|
||||||
// second time init stt block reader
|
// second or third time init stt block reader
|
||||||
if (pScanInfo->cleanSttBlocks && (pReader->info.execMode == READER_EXEC_ROWS)) {
|
if (pScanInfo->cleanSttBlocks && (pReader->info.execMode == READER_EXEC_ROWS)) {
|
||||||
return !pScanInfo->sttBlockReturned;
|
// only allowed to retrieve clean stt blocks for count once
|
||||||
|
if (pScanInfo->sttBlockReturned) {
|
||||||
|
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||||
|
tsdbDebug("uid:%" PRIu64 " set no stt-file data after stt-block retrieved, %s", pScanInfo->uid, pReader->idStr);
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
STimeWindow w = pSttBlockReader->window;
|
STimeWindow w = pSttBlockReader->window;
|
||||||
|
@ -2435,28 +2455,28 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
|
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
|
||||||
if (info.pKeyRangeList == NULL) {
|
if (info.pKeyRangeList == NULL) {
|
||||||
pReader->code = terrno;
|
pReader->code = terrno;
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(info.pKeyRangeList);
|
taosArrayDestroy(info.pKeyRangeList);
|
||||||
pReader->code = code;
|
pReader->code = code;
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = initMemDataIterator(pScanInfo, pReader);
|
code = initMemDataIterator(pScanInfo, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(info.pKeyRangeList);
|
taosArrayDestroy(info.pKeyRangeList);
|
||||||
pReader->code = code;
|
pReader->code = code;
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
|
code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(info.pKeyRangeList);
|
taosArrayDestroy(info.pKeyRangeList);
|
||||||
pReader->code = code;
|
pReader->code = code;
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (conf.rspRows) {
|
if (conf.rspRows) {
|
||||||
|
@ -2484,27 +2504,26 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
|
|
||||||
SRowKey* p = asc ? &pScanInfo->sttRange.skey : &pScanInfo->sttRange.ekey;
|
SRowKey* p = asc ? &pScanInfo->sttRange.skey : &pScanInfo->sttRange.ekey;
|
||||||
tRowKeyAssign(&pScanInfo->sttKeyInfo.nextProcKey, p);
|
tRowKeyAssign(&pScanInfo->sttKeyInfo.nextProcKey, p);
|
||||||
|
|
||||||
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
|
||||||
} else { // not clean stt blocks
|
} else { // not clean stt blocks
|
||||||
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
|
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
|
||||||
pScanInfo->sttBlockReturned = false;
|
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
||||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pScanInfo->cleanSttBlocks = false;
|
pScanInfo->cleanSttBlocks = false;
|
||||||
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
|
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
|
||||||
pScanInfo->sttBlockReturned = false;
|
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
||||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pScanInfo->sttBlockReturned = false;
|
||||||
taosArrayDestroy(info.pKeyRangeList);
|
taosArrayDestroy(info.pKeyRangeList);
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - st;
|
int64_t el = taosGetTimestampUs() - st;
|
||||||
pReader->cost.initSttBlockReader += (el / 1000.0);
|
pReader->cost.initSttBlockReader += (el / 1000.0);
|
||||||
|
|
||||||
tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
|
tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
|
||||||
return hasData;
|
if (code != 0) {
|
||||||
|
pReader->code = code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; }
|
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; }
|
||||||
|
@ -2772,7 +2791,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
(void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
|
initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
|
||||||
if (pReader->code != 0) {
|
if (pReader->code != 0) {
|
||||||
code = pReader->code;
|
code = pReader->code;
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -3190,12 +3209,12 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasDataInSttFile) {
|
if (!hasDataInSttBlock(pScanInfo)) {
|
||||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
if (!hasNexTable) {
|
if (!hasNexTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3287,7 +3306,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
|
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
|
||||||
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
if (pReader->code != 0) {
|
if (pReader->code != 0) {
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
@ -3331,7 +3350,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
||||||
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
if (pReader->code != 0) {
|
if (pReader->code != 0) {
|
||||||
return pReader->code;
|
return pReader->code;
|
||||||
}
|
}
|
||||||
|
@ -4087,7 +4106,12 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
|
||||||
SRowKey* pRowKey = &pScanInfo->lastProcKey;
|
SRowKey* pRowKey = &pScanInfo->lastProcKey;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) {
|
while (1) {
|
||||||
|
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange);
|
||||||
|
if (code || (!hasDataInSttBlock(pScanInfo))) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
|
|
||||||
int32_t ret = pkCompEx(pRowKey, pNextKey);
|
int32_t ret = pkCompEx(pRowKey, pNextKey);
|
||||||
|
|
|
@ -663,7 +663,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
atomic_store_64(&pTaskInfo->owner, 0);
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||||
return TSDB_CODE_SUCCESS;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// error occurs, record the error code and return to client
|
// error occurs, record the error code and return to client
|
||||||
|
@ -785,7 +785,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
taosRUnLockLatch(&pTaskInfo->lock);
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->owner != 0) {
|
if (pTaskInfo->owner != 0) {
|
||||||
|
|
|
@ -464,15 +464,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
||||||
|
|
||||||
SSDataBlock* p = NULL;
|
SSDataBlock* p = NULL;
|
||||||
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, &p, NULL);
|
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, &p, NULL);
|
||||||
if (p == NULL || code != TSDB_CODE_SUCCESS) {
|
if (p == NULL || code != TSDB_CODE_SUCCESS || p != pBlock) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(p != pBlock) {
|
|
||||||
qError("[loadDataBlock] p != pBlock");
|
|
||||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
|
code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue