Merge pull request #22846 from taosdata/fix/3_liaohj

fix(tsdb): check the schema before merge rows in buffer
This commit is contained in:
Haojun Liao 2023-09-12 11:43:27 +08:00 committed by GitHub
commit 025a2a3b50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 166 additions and 154 deletions

View File

@ -1922,6 +1922,7 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to encode stream since %s", terrstr());
mndTransDrop(pTrans);
return -1;
}
@ -1988,6 +1989,7 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return -1;
}
}
@ -1998,7 +2000,6 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
int32_t code = mndPersistTransLog(pStream, pTrans);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}

View File

@ -437,6 +437,8 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
taosArrayDestroy(pRes->uidList);
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
if (*pRefBlock == NULL) {
blockDataCleanup(pDelBlock);
taosMemoryFree(pDelBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}

View File

@ -1504,6 +1504,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (ps == NULL) {
return terrno;
}
int32_t code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to init row merger, code:%s", tstrerror(code));
return code;
}
}
int64_t minKey = 0;
@ -1535,15 +1541,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
// todo remove init
bool init = false;
// ASC: file block ---> last block -----> imem -----> mem
// DESC: mem -----> imem -----> last block -----> file block
if (pReader->info.order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1552,47 +1554,44 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(pMerger, fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr);
}
if (minKey == k.ts) {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
}
if (init) {
tsdbRowMergerAdd(pMerger, pRow, pSchema);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
STSchema* pTSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pTSchema == NULL) {
return terrno;
}
}
int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else {
if (minKey == k.ts) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
STSchema* pTSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pTSchema == NULL) {
return terrno;
}
}
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1605,28 +1604,18 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(pMerger, fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr);
}
if (minKey == key) {
if (init) {
tsdbRowMergerAdd(pMerger, &fRow, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
@ -1674,7 +1663,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1699,7 +1688,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
}
}
} else { // not merge block data
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1742,6 +1731,12 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
if (ps == NULL) {
return terrno;
}
int32_t code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to init row merger, code:%s", tstrerror(code));
return code;
}
}
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
@ -1768,7 +1763,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
}
// the following for key == tsLast
SRow* pTSRow = NULL;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1776,7 +1771,10 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
@ -1816,14 +1814,21 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return code;
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
}
}
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
if (piSchema == NULL) {
return code;
STSchema* piSchema = NULL;
if (piRow->type == TSDBROW_ROW_FMT) {
piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
if (piSchema == NULL) {
return code;
}
}
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
@ -1833,6 +1838,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (ps == NULL) {
return terrno;
}
code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to init row merger, code:%s", tstrerror(code));
return code;
}
}
int64_t minKey = 0;
@ -1872,15 +1883,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
bool init = false;
// ASC: file block -----> last block -----> imem -----> mem
// DESC: mem -----> imem -----> last block -----> file block
if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1890,14 +1898,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
init = true;
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
@ -1905,14 +1908,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
if (minKey == ik.ts) {
if (init) {
tsdbRowMergerAdd(pMerger, piRow, piSchema);
} else {
init = true;
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
@ -1922,15 +1920,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
if (minKey == k.ts) {
if (init) {
tsdbRowMergerAdd(pMerger, pRow, pSchema);
} else {
// STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -1938,7 +1932,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
} else {
if (minKey == k.ts) {
init = true;
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -1951,15 +1944,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
if (minKey == ik.ts) {
if (init) {
tsdbRowMergerAdd(pMerger, piRow, piSchema);
} else {
init = true;
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -1968,29 +1957,22 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
init = true;
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr);
}
if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) {
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
tsdbRowMergerAdd(pMerger, &fRow, NULL);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
}
@ -2184,6 +2166,12 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
if (ps == NULL) {
return terrno;
}
code = tsdbRowMergerInit(pMerger, ps);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to init row merger, code:%s", tstrerror(code));
return code;
}
}
if (copied) {
@ -2193,7 +2181,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SRow* pTSRow = NULL;
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3328,16 +3316,15 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
break;
}
STSchema* pTSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
if (pTSchema == NULL) {
return terrno;
}
tsdbRowMergerAdd(pMerger, pRow, pTSchema);
} else { // column format
tsdbRowMergerAdd(pMerger, pRow, NULL);
}
tsdbRowMergerAdd(pMerger, pRow, pTSchema);
}
return TSDB_CODE_SUCCESS;
@ -3473,31 +3460,30 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
int32_t code = 0;
// start to merge duplicated rows
if (current.type == TSDBROW_ROW_FMT) {
// get the correct schema for data in memory
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
STSchema* pTSchema = NULL;
if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
if (pTSchema == NULL) {
return terrno;
}
}
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
STSchema* pTSchema1 = NULL;
if (pNextRow->type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
if (pTSchema1 == NULL) {
return terrno;
}
}
tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1);
} else { // let's merge rows in file block
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&pReader->status.merger, pNextRow, NULL);
code = tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, pReader);
@ -3523,14 +3509,21 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
}
}
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
if (piSchema == NULL) {
return terrno;
STSchema* piSchema = NULL;
if (piRow->type == TSDBROW_ROW_FMT) {
piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
if (piSchema == NULL) {
return terrno;
}
}
if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem
@ -4898,10 +4891,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pMem = pTsdb->mem;
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
if (pSnap->pNode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pSnap->pNode->pQHandle = pReader;
pSnap->pNode->reseek = reseek;
@ -4912,10 +4905,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pIMem = pTsdb->imem;
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
if (pSnap->pINode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pSnap->pINode->pQHandle = pReader;
pSnap->pINode->reseek = reseek;
@ -4924,18 +4917,16 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
// fs
code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pSnap->pfSetArray);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit;
if (code == TSDB_CODE_SUCCESS) {
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
}
// unlock
_exit:
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code));
_exit:
if (code) {
*ppSnap = NULL;
if (pSnap) {
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);

View File

@ -273,7 +273,12 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
tBrinBlockClear(&pIter->block);
tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code));
return NULL;
}
pIter->recordIndex = -1;
}

View File

@ -7007,7 +7007,6 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
int32_t code = nodesListStrictAppend(pParamterList, (SNode*)col);
if (code) {
nodesDestroyNode((SNode*)col);
nodesDestroyList(pParamterList);
return code;
}
@ -7025,7 +7024,6 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
}
code = nodesListStrictAppend(pProjectionList, pFunc);
if (code) {
nodesDestroyNode(pFunc);
nodesDestroyList(pProjectionList);
return code;
}

View File

@ -83,7 +83,6 @@ static void streamSchedByTimer(void* param, void* tmrId) {
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
pTrigger->pBlock->info.type = STREAM_GET_ALL;
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
taosFreeQitem(pTrigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
return;
}

View File

@ -125,7 +125,6 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -271,7 +270,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
keys[0] = pId->streamId;
keys[1] = pId->taskId;
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
SStreamTask** ppTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask == NULL) {
continue;
}
SStreamTask* p = *ppTask;
if (p->info.fillHistory == 1) {
continue;
}

View File

@ -418,6 +418,10 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p == NULL) {
continue;
}
if ((*p)->info.fillHistory == 0) {
num += 1;
}

View File

@ -312,12 +312,20 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
}
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE) {
taosWriteQitem(pQueue, pItem);
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pItem);
return code;
}
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
} else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later.
taosWriteQitem(pQueue, pItem);
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pItem);
return code;
}
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} else {
ASSERT(0);