diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 84d696e518..99fffa2cf1 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -178,7 +178,7 @@ int32_t getJsonValueLen(const char* data); int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); -int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows); +int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 3c8d394b43..6e2b650b5b 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -147,9 +147,17 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) { return TSDB_CODE_SUCCESS; } -static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, - int32_t itemLen, int32_t numOfRows) { - ASSERT(pColumnInfoData->info.bytes >= itemLen); +static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, + int32_t itemLen, int32_t numOfRows, bool trimValue) { + if (pColumnInfoData->info.bytes < itemLen) { + uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen, pColumnInfoData->info.bytes, trimValue); + if (trimValue) { + itemLen = pColumnInfoData->info.bytes; + } else { + return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; + } + } + size_t start = 1; // the first item @@ -178,10 +186,12 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren pColumnInfoData->varmeta.length += numOfRows * itemLen; } + + return TSDB_CODE_SUCCESS; } int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, - uint32_t numOfRows) { + uint32_t numOfRows, bool trimValue) { int32_t len = pColumnInfoData->info.bytes; if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { len = varDataTLen(pData); @@ -193,8 +203,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, } } - doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows); - return TSDB_CODE_SUCCESS; + return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue); } static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2d053d04ae..7864d80a38 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -182,7 +182,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderClose(STsdbReader *pReader); -bool tsdbNextDataBlock(STsdbReader *pReader); +int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); void tsdbReleaseDataBlock(STsdbReader *pReader); SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 89d3239c33..255db3379f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -949,14 +949,17 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int pDumpInfo->lastKey = maxKey + step; } -static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, +static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) { if (IS_VAR_DATA_TYPE(pColVal->type)) { if (!COL_VAL_IS_VALUE(pColVal)) { colDataSetNULL(pColInfoData, rowIndex); } else { varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData); - ASSERT(pColVal->value.nData <= pColInfoData->info.bytes); + if (pColVal->value.nData > pColInfoData->info.bytes) { + tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes); + return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; + } if (pColVal->value.nData > 0) { // pData may be null, if nData is 0 memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); } @@ -966,6 +969,8 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_ } else { colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal)); } + + return TSDB_CODE_SUCCESS; } static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { @@ -1167,6 +1172,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { SDataBlk* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->pResBlock; int32_t numOfOutputCols = pSupInfo->numOfCols; + int32_t code = TSDB_CODE_SUCCESS; SColVal cv = {0}; int64_t st = taosGetTimestampUs(); @@ -1244,7 +1250,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { } else { // varchar/nchar type for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) { tColDataGetValue(pData, j, &cv); - doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); + code = doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); + if (code) { + return code; + } } } } @@ -1776,23 +1785,29 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* } static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, - SFileBlockDumpInfo* pDumpInfo) { + SFileBlockDumpInfo* pDumpInfo, bool *copied) { // opt version // 1. it is not a border point // 2. the direct next point is not an duplicated timestamp + int32_t code = TSDB_CODE_SUCCESS; + + *copied = false; bool asc = (pReader->order == TSDB_ORDER_ASC); if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) { int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1; int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; if (nextKey != key) { // merge is not needed - doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); + code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); + if (code) { + return code; + } pDumpInfo->rowIndex += step; - return true; + *copied = true; } } - return false; + return code; } static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, @@ -1819,20 +1834,34 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc } static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, - STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) { + STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool *copied) { + int32_t code = TSDB_CODE_SUCCESS; + + *copied = false; + bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange); if (hasVal) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 != ts) { - doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); - return true; + code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); + if (code) { + return code; + } + + *copied = true; + return code; } } else { - doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); - return true; + code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); + if (code) { + return code; + } + + *copied = true; + return code; } - return false; + return code; } static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) { @@ -2022,11 +2051,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; + + return code; } static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, @@ -2034,7 +2064,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, bool mergeBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); - + bool copied = false; + int32_t code = TSDB_CODE_SUCCESS; SRow* pTSRow = NULL; SRowMerger merge = {0}; TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); @@ -2042,7 +2073,12 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, // only last block exists if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { - if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) { + code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied); + if (code) { + return code; + } + + if (copied) { pBlockScanInfo->lastKey = tsLastBlock; return TSDB_CODE_SUCCESS; } else { @@ -2060,10 +2096,15 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(&merge); + + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } } else { // not merge block data int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); @@ -2083,10 +2124,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(&merge); + + if (code != TSDB_CODE_SUCCESS) { + return code; + } } return TSDB_CODE_SUCCESS; @@ -2131,7 +2176,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader return code; } - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(&merge); @@ -2353,7 +2398,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(&merge); @@ -2508,7 +2553,13 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { + bool copied = false; + int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied); + if (code) { + return code; + } + + if (copied) { pBlockScanInfo->lastKey = key; return TSDB_CODE_SUCCESS; } else { @@ -2528,11 +2579,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc return code; } - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; + return code; } } @@ -2649,7 +2700,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && pBlock->nRow <= pReader->capacity) { if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { - copyBlockDataToSDataBlock(pReader); + code = copyBlockDataToSDataBlock(pReader); + if (code) { + goto _end; + } // record the last key value pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts; @@ -2696,8 +2750,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { break; } - buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); - + code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); + if (code) { + goto _end; + } + // currently loaded file data block is consumed if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); @@ -2922,6 +2979,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; STableUidList* pUidList = &pStatus->uidList; + int32_t code = TSDB_CODE_SUCCESS; if (taosHashGetSize(pStatus->pTableMap) == 0) { return TSDB_CODE_SUCCESS; @@ -2952,7 +3010,11 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { break; } - buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); + code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); + if (code) { + return code; + } + if (pResBlock->info.rows >= pReader->capacity) { break; } @@ -3032,7 +3094,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { break; } - buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); + code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); + if (code) { + return code; + } + if (pResBlock->info.rows >= pReader->capacity) { break; } @@ -3784,6 +3850,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) { int32_t outputRowIndex = pBlock->info.rows; int64_t uid = pScanInfo->uid; + int32_t code = TSDB_CODE_SUCCESS; int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); @@ -3806,7 +3873,10 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]); tRowGet(pTSRow, pSchema, j, &colVal); - doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo); + code = doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo); + if (code) { + return code; + } i += 1; j += 1; } else if (colId < pSchema->columns[j].colId) { @@ -3836,6 +3906,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S int32_t rowIndex) { int32_t i = 0, j = 0; int32_t outputRowIndex = pResBlock->info.rows; + int32_t code = TSDB_CODE_SUCCESS; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -3858,7 +3929,10 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]); if (pData->cid == pSupInfo->colId[i]) { tColDataGetValue(pData, rowIndex, &cv); - doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); + code = doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); + if (code) { + return code; + } j += 1; } else if (pData->cid > pCol->info.colId) { // the specified column does not exist in file block, fill with null data @@ -3882,6 +3956,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; + int32_t code = TSDB_CODE_SUCCESS; do { // SRow* pTSRow = NULL; @@ -3893,13 +3968,20 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e } if (row.type == TSDBROW_ROW_FMT) { - doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); if (freeTSRow) { taosMemoryFree(row.pTSRow); } + + if (code) { + return code; + } } else { - doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); + code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); + if (code) { + break; + } } // no data in buffer, return immediately @@ -3912,7 +3994,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e } } while (1); - return TSDB_CODE_SUCCESS; + return code; } // TODO refactor: with createDataBlockScanInfo @@ -4399,43 +4481,51 @@ _err: return code; } -static bool doTsdbNextDataBlock(STsdbReader* pReader) { +static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { + int32_t code = TSDB_CODE_SUCCESS; + // cleanup the data that belongs to the previous data block SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); + *hasNext = false; + SReaderStatus* pStatus = &pReader->status; if (taosHashGetSize(pStatus->pTableMap) == 0) { - return false; + return code; } if (pStatus->loadFromFile) { - int32_t code = buildBlockFromFiles(pReader); + code = buildBlockFromFiles(pReader); if (code != TSDB_CODE_SUCCESS) { - return false; + return code; } - if (pBlock->info.rows > 0) { - return true; - } else { + if (pBlock->info.rows <= 0) { resetTableListIndex(&pReader->status); - buildBlockFromBufferSequentially(pReader); - return pBlock->info.rows > 0; + code = buildBlockFromBufferSequentially(pReader); } } else { // no data in files, let's try the buffer - buildBlockFromBufferSequentially(pReader); - return pBlock->info.rows > 0; + code = buildBlockFromBufferSequentially(pReader); } + + *hasNext = pBlock->info.rows > 0; + + return code; } -bool tsdbNextDataBlock(STsdbReader* pReader) { +int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { + int32_t code = TSDB_CODE_SUCCESS; + + *hasNext = false; + if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) { - return false; + return code; } SReaderStatus* pStatus = &pReader->status; - int32_t code = tsdbAcquireReader(pReader); + code = tsdbAcquireReader(pReader); qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); if (pReader->suspended) { @@ -4443,16 +4533,21 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } if (pReader->innerReader[0] != NULL && pReader->step == 0) { - bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); + code = doTsdbNextDataBlock(pReader->innerReader[0], hasNext); + if (code) { + tsdbReleaseReader(pReader); + return code; + } + pReader->step = EXTERNAL_ROWS_PREV; - if (ret) { + if (*hasNext) { pStatus = &pReader->innerReader[0]->status; if (pStatus->composedDataBlock) { qTrace("tsdb/read: %p, unlock read mutex", pReader); tsdbReleaseReader(pReader); } - return ret; + return code; } } @@ -4469,14 +4564,19 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { pReader->step = EXTERNAL_ROWS_MAIN; } - bool ret = doTsdbNextDataBlock(pReader); - if (ret) { + code = doTsdbNextDataBlock(pReader, hasNext); + if (code != TSDB_CODE_SUCCESS) { + tsdbReleaseReader(pReader); + return code; + } + + if (*hasNext) { if (pStatus->composedDataBlock) { qTrace("tsdb/read: %p, unlock read mutex", pReader); tsdbReleaseReader(pReader); } - return ret; + return code; } if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) { @@ -4488,23 +4588,28 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return code; } - ret = doTsdbNextDataBlock(pReader->innerReader[1]); + code = doTsdbNextDataBlock(pReader->innerReader[1], hasNext); + if (code != TSDB_CODE_SUCCESS) { + tsdbReleaseReader(pReader); + return code; + } + pReader->step = EXTERNAL_ROWS_NEXT; - if (ret) { + if (*hasNext) { pStatus = &pReader->innerReader[1]->status; if (pStatus->composedDataBlock) { qTrace("tsdb/read: %p, unlock read mutex", pReader); tsdbReleaseReader(pReader); } - return ret; + return code; } } qTrace("tsdb/read: %p, unlock read mutex", pReader); tsdbReleaseReader(pReader); - return false; + return code; } static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) { @@ -4649,20 +4754,27 @@ STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, co static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; + int32_t code = TSDB_CODE_SUCCESS; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); if (pBlockScanInfo == NULL) { return NULL; } - int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); + code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); + if (code != TSDB_CODE_SUCCESS) { + tBlockDataDestroy(&pStatus->fileBlockData); + terrno = code; + return NULL; + } + + code = copyBlockDataToSDataBlock(pReader); if (code != TSDB_CODE_SUCCESS) { tBlockDataDestroy(&pStatus->fileBlockData); terrno = code; return NULL; } - copyBlockDataToSDataBlock(pReader); return pReader->pResBlock; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5dff1abb97..870c96811b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -584,10 +584,16 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int if (isNullVal) { colDataSetNNULL(pColInfoData, 0, pBlock->info.rows); } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { - colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows); + code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false); if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { taosMemoryFree(data); } + if (code) { + if (freeReader) { + metaReaderClear(&mr); + } + return code; + } } else { // todo opt for json tag for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataSetVal(pColInfoData, i, data, false); @@ -634,10 +640,22 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSDataBlock* pBlock = pTableScanInfo->pResBlock; + bool hasNext = false; + int32_t code = TSDB_CODE_SUCCESS; int64_t st = taosGetTimestampUs(); - while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) { + while (true) { + code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext); + if (code) { + tsdbReleaseDataBlock(pTableScanInfo->base.dataReader); + T_LONG_JMP(pTaskInfo->env, code); + } + + if (!hasNext) { + break; + } + if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(pTableScanInfo->base.dataReader); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); @@ -1015,7 +1033,15 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU return NULL; } - if (tsdbNextDataBlock(pReader)) { + bool hasNext = false; + code = tsdbNextDataBlock(pReader, &hasNext); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + T_LONG_JMP(pTaskInfo->env, code); + return NULL; + } + + if (hasNext) { /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); @@ -2104,12 +2130,22 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamRawScanInfo* pInfo = pOperator->info; + int32_t code = TSDB_CODE_SUCCESS; pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; qDebug("tmqsnap doRawScan called"); if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { - if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { + bool hasNext = false; + if (pInfo->dataReader) { + code = tsdbNextDataBlock(pInfo->dataReader, &hasNext); + if (code) { + tsdbReleaseDataBlock(pInfo->dataReader); + longjmp(pTaskInfo->env, code); + } + } + + if (pInfo->dataReader && hasNext) { if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(pInfo->dataReader); longjmp(pTaskInfo->env, pTaskInfo->code); @@ -2610,8 +2646,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pInfo->base.dataReader = source->dataReader; STsdbReader* reader = pInfo->base.dataReader; + bool hasNext = false; qTrace("tsdb/read-table-data: %p, enter next reader", reader); - while (tsdbNextDataBlock(reader)) { + + while (true) { + code = tsdbNextDataBlock(reader, &hasNext); + if (code != 0) { + tsdbReleaseDataBlock(reader); + pInfo->base.dataReader = NULL; + T_LONG_JMP(pTaskInfo->env, code); + } + + if (!hasNext) { + break; + } + if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(reader); pInfo->base.dataReader = NULL; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index f24d3523c8..00a90cc108 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1627,7 +1627,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(varTbName, name); - colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows); + colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows, true); } doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 195a08525c..1affbac613 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1755,7 +1755,11 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { char* p = colDataGetVarData(pInput->columnData, 0); - colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows); + int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true); + if (code) { + return code; + } + pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 4d6b5ff05e..260d47032a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -14,6 +14,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 @@ -22,6 +24,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py diff --git a/tests/system-test/2-query/columnLenUpdated.py b/tests/system-test/2-query/columnLenUpdated.py new file mode 100644 index 0000000000..d6940fde8b --- /dev/null +++ b/tests/system-test/2-query/columnLenUpdated.py @@ -0,0 +1,181 @@ + +import taos +import sys +import time +import socket +import os +import platform +if platform.system().lower() == 'windows': + import wexpect as taosExpect +else: + import pexpect as taosExpect + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +def taos_command (buildPath, key, value, expectString, sqlString=''): + if len(key) == 0: + tdLog.exit("taos test key is null!") + + if platform.system().lower() == 'windows': + taosCmd = buildPath + '\\build\\bin\\taos.exe ' + taosCmd = taosCmd.replace('\\','\\\\') + else: + taosCmd = buildPath + '/build/bin/taos ' + + cfgPath = buildPath + "/../sim/psim/cfg" + taosCmd = taosCmd + ' -c' + cfgPath + ' -' + key + if len(value) != 0: + taosCmd = taosCmd + ' ' + value + + tdLog.info ("taos cmd: %s" % taosCmd) + + child = taosExpect.spawn(taosCmd, timeout=20) + #output = child.readline() + #print (output.decode()) + if len(expectString) != 0: + i = child.expect([expectString, taosExpect.TIMEOUT, taosExpect.EOF], timeout=20) + else: + i = child.expect([taosExpect.TIMEOUT, taosExpect.EOF], timeout=20) + + if platform.system().lower() == 'windows': + retResult = child.before + else: + retResult = child.before.decode() + print(retResult) + #print(child.after.decode()) + if i == 0: + print ('taos login success! Here can run sql, taos> ') + return "TAOS_OK" + else: + return "TAOS_FAIL" + +class TDTestCase: + #updatecfgDict = {'clientCfg': {'serverPort': 7080, 'firstEp': 'trd02:7080', 'secondEp':'trd02:7080'},\ + # 'serverPort': 7080, 'firstEp': 'trd02:7080'} + hostname = socket.gethostname() + if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""): + try: + config = eval(tdDnodes.dnodes[0].remoteIP) + hostname = config["host"] + except Exception: + hostname = tdDnodes.dnodes[0].remoteIP + serverPort = '7080' + rpcDebugFlagVal = '143' + clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + clientCfgDict["serverPort"] = serverPort + clientCfgDict["firstEp"] = hostname + ':' + serverPort + clientCfgDict["secondEp"] = hostname + ':' + serverPort + clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + clientCfgDict["fqdn"] = hostname + + updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + updatecfgDict["clientCfg"] = clientCfgDict + updatecfgDict["serverPort"] = serverPort + updatecfgDict["firstEp"] = hostname + ':' + serverPort + updatecfgDict["secondEp"] = hostname + ':' + serverPort + updatecfgDict["fqdn"] = hostname + + print ("===================: ", updatecfgDict) + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + tdSql.prepare() + # time.sleep(2) + tdSql.query("create user testpy pass 'testpy'") + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + checkNetworkStatus = ['0: unavailable', '1: network ok', '2: service ok', '3: service degraded', '4: exiting'] + netrole = ['client', 'server'] + + keyDict = {'h':'', 'P':'6030', 'p':'testpy', 'u':'testpy', 'a':'', 'A':'', 'c':'', 'C':'', 's':'', 'r':'', 'f':'', \ + 'k':'', 't':'', 'n':'', 'l':'1024', 'N':'100', 'V':'', 'd':'db', 'w':'30', '-help':'', '-usage':'', '?':''} + + keyDict['h'] = self.hostname + keyDict['c'] = cfgPath + keyDict['P'] = self.serverPort + + tdSql.query("drop database if exists db1") + tdSql.query("create database if not exists db1 vgroups 1") + tdSql.query("use db1") + tdSql.query("create table tba (ts timestamp, f1 binary(2))") + tdSql.query("insert into tba values (now, '22')") + tdSql.query("select * from tba") + tdSql.checkData(0, 1, '22') + + keyDict['s'] = "\"alter table db1.tba modify column f1 binary(5) \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + keyDict['s'] = "\"insert into db1.tba values (now, '55555')\"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + tdSql.query("select * from tba order by ts") + tdSql.checkData(0, 1, '22') + tdSql.checkData(1, 1, '55555') + + + tdSql.query("create table stb (ts timestamp, f1 int) tags (tg1 binary(2))") + tdSql.query("create table tb1 using stb tags('bb')") + tdSql.query("insert into tb1 values (now, 2)") + tdSql.query("select count(*) from stb group by tg1") + tdSql.checkData(0, 0, 1) + + keyDict['s'] = "\"alter table db1.stb modify tag tg1 binary(5) \"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + keyDict['s'] = "\"create table db1.tb2 using db1.stb tags('bbbbb')\"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + keyDict['s'] = "\"insert into db1.tb2 values (now, 2)\"" + retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + tdSql.query("select count(*) from stb group by tg1") + tdSql.checkData(0, 0, 1) + tdSql.checkData(1, 0, 1) + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())