Merge pull request #27085 from taosdata/fix/TD-31317

fix issue
This commit is contained in:
Haojun Liao 2024-08-09 09:25:08 +08:00 committed by GitHub
commit ac413cd8ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 39 additions and 23 deletions

View File

@ -3519,8 +3519,10 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
resetTableListIndex(&pReader->status);
}
// set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter);
if (code == TSDB_CODE_SUCCESS) {
// set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter);
}
taosArrayDestroy(pTableList);
return code;
}

View File

@ -1074,8 +1074,12 @@ int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
int32_t inc = numOfFileObj - size;
for (int32_t k = 0; k < inc; ++k) {
SLDataIter* pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
void* px = taosArrayPush(pLDIterList, &pIter);
if (!pIter) {
return terrno;
}
void* px = taosArrayPush(pLDIterList, &pIter);
if (px == NULL) {
taosMemoryFree(pIter);
return TSDB_CODE_OUT_OF_MEMORY;
}
}

View File

@ -245,8 +245,10 @@ _error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
pInfo->pTableList = NULL;
destroyCacheScanOperator(pInfo);
if (pInfo != NULL) {
pInfo->pTableList = NULL;
destroyCacheScanOperator(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);

View File

@ -179,7 +179,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
return code;
_error:
destroyProjectOperatorInfo(pInfo);
if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
@ -531,7 +531,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
return code;
_error:
destroyIndefinitOperatorInfo(pInfo);
if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);

View File

@ -1568,6 +1568,8 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
int64_t maxVersion) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
STableScanInfo* pTableScanInfo = pTableScanOp->info;
@ -1582,37 +1584,33 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL;
int32_t code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(void**)&pReader, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
return NULL;
}
QUERY_CHECK_CODE(code, lino, _end);
bool hasNext = false;
code = pAPI->tsdReader.tsdNextDataBlock(pReader, &hasNext);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
return NULL;
}
QUERY_CHECK_CODE(code, lino, _end);
if (hasNext) {
SSDataBlock* p = NULL;
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, &p, NULL);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
QUERY_CHECK_CODE(code, lino, _end);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
}
_end:
pAPI->tsdReader.tsdReaderClose(pReader);
qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
", suid:%" PRIu64,
pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
terrno = code;
return NULL;
}
return pBlock->info.rows > 0 ? pBlock : NULL;
}
@ -2259,6 +2257,10 @@ static int32_t generatePartitionDelResBlock(SStreamScanInfo* pInfo, SSDataBlock*
uint64_t srcUid = srcUidData[delI];
char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[delI], srcEndTsCol[delI], ver);
if (!pPreRes) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
continue;
}
code = blockDataEnsureCapacity(pDestBlock, pDestBlock->info.rows + pPreRes->info.rows);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t preJ = 0; preJ < pPreRes->info.rows; preJ++) {
@ -2331,6 +2333,10 @@ static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock
if (winCode != TSDB_CODE_SUCCESS) {
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver);
if (!pPreRes) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
continue;
}
printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo));
code = calBlockTbName(pInfo, pPreRes, 0);
QUERY_CHECK_CODE(code, lino, _end);
@ -5952,8 +5958,10 @@ _error:
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
pTaskInfo->code = code;
pInfo->base.pTableListInfo = NULL;
if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo);
if (pInfo != NULL) {
pInfo->base.pTableListInfo = NULL;
destroyTableMergeScanOperatorInfo(pInfo);
}
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);