From a185561405207ff118e86fba741e65800c2d5abc Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 18:46:59 +0800 Subject: [PATCH 01/12] fix: fix asan error --- source/libs/scalar/src/filter.c | 4 ++++ tests/script/sh/checkAsan.sh | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 25e65d2588..d4444ead0f 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3146,6 +3146,10 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows for (int32_t i = 0; i < numOfRows; ++i) { uint32_t uidx = info->groups[0].unitIdxs[0]; + if (((SColumnInfoData *)info->cunits[uidx].colData)->pData == NULL) { + continue; + } + void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); diff --git a/tests/script/sh/checkAsan.sh b/tests/script/sh/checkAsan.sh index 20d1359da8..8d58ff9135 100755 --- a/tests/script/sh/checkAsan.sh +++ b/tests/script/sh/checkAsan.sh @@ -54,7 +54,7 @@ python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l` # /home/chr/TDengine/source/libs/scalar/src/filter.c:3149:14: runtime error: applying non-zero offset 18446744073709551615 to null pointer # /home/TDinternal/community/source/libs/scalar/src/sclvector.c:1109:66: runtime error: signed integer overflow: 9223372034707292160 + 1676867897049 cannot be represented in type 'long int' -runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |grep -v "filter.c:3149:14" |wc -l` +runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |wc -l` echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m" echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m" From f67547d59fd177de5593b971817df3b3eaeb2e39 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 23 Feb 2023 14:04:07 +0800 Subject: [PATCH 02/12] fix: create index error --- include/common/tmsg.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 77b9d2d681..d389d572f6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2850,7 +2850,7 @@ typedef struct { char dbFName[TSDB_DB_FNAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; char colName[TSDB_COL_NAME_LEN]; - char idxName[TSDB_COL_NAME_LEN]; + char idxName[TSDB_INDEX_FNAME_LEN]; int8_t idxType; } SCreateTagIndexReq; From 7d75939fae0b3772496b0ca15c1589ef2a4207ed Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Feb 2023 14:35:05 +0800 Subject: [PATCH 03/12] fix: fix invalid dist count --- source/dnode/vnode/src/tsdb/tsdbRead.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c83fdb2e4f..a2e421555b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -488,7 +488,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb return TSDB_CODE_SUCCESS; } -static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool *hasNext) { +static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool* hasNext) { bool asc = ASCENDING_TRAVERSE(pIter->order); int32_t step = asc ? 1 : -1; pIter->index += step; @@ -2802,13 +2802,13 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx)); while (1) { - bool hasNext = false; + bool hasNext = false; int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext); if (code) { taosArrayDestroy(pIndexList); return code; } - + if (!hasNext) { // no data files on disk break; } @@ -4688,6 +4688,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa pTableBlockInfo->numOfVgroups = 1; // find the start data block in file + + tsdbAcquireReader(pReader); + if (pReader->suspended) { + tsdbReaderResume(pReader); + } SReaderStatus* pStatus = &pReader->status; STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg; @@ -4749,7 +4754,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, // pReader->pFileGroup->fid, pReader->idStr); } - + tsdbReleaseReader(pReader); return code; } From e8527ee9de7470eb568d53d763da8b1fce56a94d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Feb 2023 14:46:34 +0800 Subject: [PATCH 04/12] fix: fix invalid dist count --- include/common/tcommon.h | 12 ++--- source/libs/function/src/builtinsimpl.c | 59 ++++++++++++------------- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index f4e13509c2..2a40976a8b 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -205,7 +205,7 @@ typedef struct SDataBlockInfo { STimeWindow calWin; // used for stream, do not serialize TSKEY watermark; // used for stream - char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition + char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition } SDataBlockInfo; typedef struct SSDataBlock { @@ -291,7 +291,6 @@ typedef struct STableBlockDistInfo { uint16_t numOfFiles; uint32_t numOfTables; uint32_t numOfBlocks; - uint32_t numOfVgroups; uint64_t totalSize; uint64_t totalRows; int32_t maxRows; @@ -301,6 +300,7 @@ typedef struct STableBlockDistInfo { int32_t firstSeekTimeUs; uint32_t numOfInmemRows; uint32_t numOfSmallBlocks; + uint32_t numOfVgroups; int32_t blockRowsHisto[20]; } STableBlockDistInfo; @@ -341,7 +341,7 @@ typedef struct SExprInfo { typedef struct { const char* key; - size_t keyLen; + size_t keyLen; uint8_t type; union { const char* value; @@ -385,9 +385,9 @@ typedef struct STUidTagInfo { #define TABLE_NAME_COLUMN_INDEX 6 // stream create table block column -#define UD_TABLE_NAME_COLUMN_INDEX 0 -#define UD_GROUPID_COLUMN_INDEX 1 -#define UD_TAG_COLUMN_INDEX 2 +#define UD_TABLE_NAME_COLUMN_INDEX 0 +#define UD_GROUPID_COLUMN_INDEX 1 +#define UD_TAG_COLUMN_INDEX 2 #ifdef __cplusplus } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 8e52ae5f30..4a23b35b33 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -775,13 +775,13 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, - int32_t rowIndex); + int32_t rowIndex); int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); + SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t currentRow = pBlock->info.rows; @@ -795,7 +795,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_BIGINT: ((int64_t*)pCol->pData)[currentRow] = pRes->v; -// colDataAppendInt64(pCol, currentRow, &pRes->v); + // colDataAppendInt64(pCol, currentRow, &pRes->v); break; case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: @@ -920,9 +920,7 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) } } -void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) { - *pDestPos = *pSourcePos; -} +void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) { *pDestPos = *pSourcePos; } int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); @@ -1686,7 +1684,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); int32_t code = 0; - double v = 0; + double v = 0; tMemBucket* pMemBucket = ppInfo->pMemBucket; if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null @@ -1717,7 +1715,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } int32_t slotId = pCtx->pExpr->base.resSchema.slotId; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); varDataSetLen(buf, len); colDataAppend(pCol, pBlock->info.rows, buf, false); @@ -1742,7 +1740,6 @@ _fin_error: tMemBucketDestroy(pMemBucket); return code; - } bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -2103,7 +2100,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) { } static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, - SFirstLastRes* pInfo) { + SFirstLastRes* pInfo) { int32_t code = TSDB_CODE_SUCCESS; if (pCtx->subsidiaries.num <= 0) { @@ -2156,7 +2153,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } // All null data column, return directly. - if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) { + if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && + pInputCol->hasNull == true) { // save selectivity value for column consisted of all null values int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); if (code != TSDB_CODE_SUCCESS) { @@ -2272,7 +2270,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } // All null data column, return directly. - if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) { + if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && + pInputCol->hasNull == true) { // save selectivity value for column consisted of all null values int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); if (code != TSDB_CODE_SUCCESS) { @@ -2366,7 +2365,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - char* data = colDataGetData(pInputCol, chosen); + char* data = colDataGetData(pInputCol, chosen); int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2377,7 +2376,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { - char* data = colDataGetData(pInputCol, i); + char* data = colDataGetData(pInputCol, i); int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2394,7 +2393,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { numOfElems++; if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { - char* data = colDataGetData(pInputCol, i); + char* data = colDataGetData(pInputCol, i); int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2441,7 +2440,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p } static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, - int32_t rowIndex) { + int32_t rowIndex) { if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) { int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); if (code != TSDB_CODE_SUCCESS) { @@ -2468,7 +2467,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer for (int32_t i = start; i < start + pInput->numOfRows; ++i) { char* data = colDataGetData(pCol, i); SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data); - int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); + int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2700,7 +2699,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, } static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, - int32_t order, int64_t ts) { + int32_t order, int64_t ts) { int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1; pDiffInfo->prevTs = ts; switch (type) { @@ -2908,8 +2907,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { return pRes; } -static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); +static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, + uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery); @@ -2930,7 +2929,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { } numOfElems++; - char* data = colDataGetData(pCol, i); + char* data = colDataGetData(pCol, i); int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2964,7 +2963,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { } numOfElems++; - char* data = colDataGetData(pCol, i); + char* data = colDataGetData(pCol, i); int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3016,7 +3015,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par } int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { + uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { STopBotRes* pRes = getTopBotOutputInfo(pCtx); SVariant val = {0}; @@ -3207,7 +3206,7 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo if (pPage == NULL) { return NULL; } - char* p = pPage->data + pPos->offset; + char* p = pPage->data + pPos->offset; releaseBufPage(pHandle->pBuf, pPage); return p; } else { @@ -4668,7 +4667,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { continue; } - char* data = colDataGetData(pInputCol, i); + char* data = colDataGetData(pInputCol, i); int32_t code = doReservoirSample(pCtx, pInfo, data, i); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4688,7 +4687,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { } int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SSampleInfo* pInfo = getSampleOutputInfo(pCtx); @@ -5022,7 +5021,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { } numOfElems++; - char* data = colDataGetData(pInputCol, i); + char* data = colDataGetData(pInputCol, i); int32_t code = doModeAdd(pInfo, i, pCtx, data); if (code != TSDB_CODE_SUCCESS) { return code; @@ -5428,7 +5427,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->numOfBlocks += p1.numOfBlocks; pDistInfo->numOfTables += p1.numOfTables; pDistInfo->numOfInmemRows += p1.numOfInmemRows; - pDistInfo->numOfVgroups += p1.numOfVgroups; pDistInfo->totalSize += p1.totalSize; pDistInfo->totalRows += p1.totalRows; pDistInfo->numOfFiles += p1.numOfFiles; @@ -5445,6 +5443,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->maxRows = p1.maxRows; } + pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0); for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) { pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; } @@ -5463,7 +5462,6 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1; if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1; if (tEncodeU64(&encoder, pInfo->totalRows) < 0) return -1; @@ -5473,6 +5471,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1; + if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1; for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1; @@ -5495,7 +5494,6 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1; if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1; if (tDecodeU64(&decoder, &pInfo->totalRows) < 0) return -1; @@ -5505,6 +5503,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1; + if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1; for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1; From 3199ba2357ae6f1f8e294c30077e383b989dd971 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 18:46:59 +0800 Subject: [PATCH 05/12] fix: fix asan error --- source/libs/scalar/src/filter.c | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index d4444ead0f..bb71fd7652 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3120,9 +3120,8 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, for (int32_t i = 0; i < numOfRows; ++i) { uint32_t uidx = info->groups[0].unitIdxs[0]; - void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); - p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); + p[i] = colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL); if (p[i] == 0) { all = false; } else { @@ -3146,13 +3145,8 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows for (int32_t i = 0; i < numOfRows; ++i) { uint32_t uidx = info->groups[0].unitIdxs[0]; - if (((SColumnInfoData *)info->cunits[uidx].colData)->pData == NULL) { - continue; - } - void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); - - p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); + p[i] = !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL); if (p[i] == 0) { all = false; } else { @@ -3182,13 +3176,13 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData *pRe for (int32_t i = 0; i < numOfRows; ++i) { SColumnInfoData *pData = info->cunits[0].colData; - void *colData = colDataGetData(pData, i); - if (colData == NULL || colDataIsNull_s(pData, i)) { + if (colDataIsNull_s(pData, i)) { all = false; p[i] = 0; continue; } + void *colData = colDataGetData(pData, i); p[i] = (*rfunc)(colData, colData, valData, valData2, func); if (p[i] == 0) { @@ -3214,13 +3208,13 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes for (int32_t i = 0; i < numOfRows; ++i) { uint32_t uidx = info->groups[0].unitIdxs[0]; - void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); - if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) { + if (colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) { p[i] = 0; all = false; continue; } + void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); // match/nmatch for nchar type need convert from ucs4 to mbs if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && (info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)) { @@ -3278,7 +3272,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SC if (!isNull) { colData = colDataGetData((SColumnInfoData *)(cunit->colData), i); } - + if (colData == NULL || isNull) { p[i] = optr == OP_TYPE_IS_NULL ? true : false; } else { From 8fa9f16e9bda08bf3aea3cb6ba00d718f0678bf0 Mon Sep 17 00:00:00 2001 From: WANG Xu Date: Thu, 23 Feb 2023 17:55:23 +0800 Subject: [PATCH 06/12] fix: bad sql grammar --- docs/en/07-develop/07-tmq.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index 92db7d4cbf..c85109d3c5 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -222,7 +222,7 @@ A database including one supertable and two subtables is created as follows: ```sql DROP DATABASE IF EXISTS tmqdb; CREATE DATABASE tmqdb; -CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16)); +CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16)); CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0"); CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); From 8514f1c7f0d9890a779857220b7de9ecaf97011b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 23 Feb 2023 18:56:32 +0800 Subject: [PATCH 07/12] fix: report error when NULL ts is inserted --- source/libs/executor/src/dataInserter.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 16b43b560c..e9c46843c0 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -230,12 +230,19 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp case TSDB_DATA_TYPE_BLOB: case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_MEDIUMBLOB: - uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); - ASSERT(0); + qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); + terrno = TSDB_CODE_APP_ERROR; + goto _end; break; default: if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (colDataIsNull_s(pColInfoData, j)) { + if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { + qError("NULL value for primary key"); + terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL; + goto _end; + } + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type taosArrayPush(pVals, &cv); } else { @@ -256,7 +263,8 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } } else { uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); - ASSERT(0); + terrno = TSDB_CODE_APP_ERROR; + goto _end; } break; } @@ -296,7 +304,7 @@ _end: tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } - return TSDB_CODE_FAILED; + return terrno; } *ppReq = pReq; return TSDB_CODE_SUCCESS; From 97898f5c2dbdeaffaddd36c3dbeeecd61b1813ad Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 24 Feb 2023 13:40:02 +0800 Subject: [PATCH 08/12] fix: docker refine (#20116) * fix: docker doc refine * fix: refine dockerfile --- packaging/docker/DockerfileCloud | 30 ------------------------------ 1 file changed, 30 deletions(-) delete mode 100644 packaging/docker/DockerfileCloud diff --git a/packaging/docker/DockerfileCloud b/packaging/docker/DockerfileCloud deleted file mode 100644 index fa8fcabf34..0000000000 --- a/packaging/docker/DockerfileCloud +++ /dev/null @@ -1,30 +0,0 @@ -FROM ubuntu:18.04 - -WORKDIR /root - -ARG pkgFile -ARG dirName -ARG cpuType -RUN echo ${pkgFile} && echo ${dirName} - -RUN apt update -RUN apt install -y curl - -COPY ${pkgFile} /root/ -ENV TINI_VERSION v0.19.0 -ENV TAOS_DISABLE_ADAPTER 1 -ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini -ENV DEBIAN_FRONTEND=noninteractive -WORKDIR /root/ -RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini - -ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \ - LC_CTYPE=en_US.UTF-8 \ - LANG=en_US.UTF-8 \ - LC_ALL=en_US.UTF-8 -COPY ./run.sh /usr/bin/ -COPY ./bin/* /usr/bin/ - -ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"] -CMD ["bash", "-c", "/usr/bin/run.sh"] -VOLUME [ "/var/lib/taos", "/var/log/taos" ] From e3f21ae98f67f7a8a01c851ec49a5ff685618966 Mon Sep 17 00:00:00 2001 From: xinsheng Ren <285808407@qq.com> Date: Fri, 24 Feb 2023 15:17:14 +0800 Subject: [PATCH 09/12] fix: TD-22674 coverity scan (#20126) * fix: TD-22674 coverity scan * fix: compile error --------- Co-authored-by: facetosea <25808407@qq.com> --- source/os/src/osDir.c | 19 +++++++++---------- source/os/src/osFile.c | 13 +++++++++---- source/os/src/osSocket.c | 6 ++++-- source/os/src/osSysinfo.c | 21 +++++++++------------ source/os/src/osTimezone.c | 2 +- tools/shell/src/shellEngine.c | 2 ++ 6 files changed, 34 insertions(+), 29 deletions(-) diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 331d241745..3d63da7ba3 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -89,6 +89,8 @@ typedef struct dirent TdDirEntry; #endif +#define TDDIRMAXLEN 1024 + void taosRemoveDir(const char *dirname) { TdDirPtr pDir = taosOpenDir(dirname); if (pDir == NULL) return; @@ -133,8 +135,8 @@ int32_t taosMkDir(const char *dirname) { } int32_t taosMulMkDir(const char *dirname) { - if (dirname == NULL) return -1; - char temp[1024]; + if (dirname == NULL || strlen(dirname) >= TDDIRMAXLEN) return -1; + char temp[TDDIRMAXLEN]; char *pos = temp; int32_t code = 0; #ifdef WINDOWS @@ -192,8 +194,8 @@ int32_t taosMulMkDir(const char *dirname) { } int32_t taosMulModeMkDir(const char *dirname, int mode) { - if (dirname == NULL) return -1; - char temp[1024]; + if (dirname == NULL || strlen(dirname) >= TDDIRMAXLEN) return -1; + char temp[TDDIRMAXLEN]; char *pos = temp; int32_t code = 0; #ifdef WINDOWS @@ -204,8 +206,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) { #endif if (taosDirExist(temp)) { - chmod(temp, mode); - return code; + return chmod(temp, mode); } if (strncmp(temp, TD_DIRSEP, 1) == 0) { @@ -247,12 +248,10 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) { } if (code < 0 && errno == EEXIST) { - chmod(temp, mode); - return 0; + return chmod(temp, mode); } - chmod(temp, mode); - return code; + return chmod(temp, mode); } void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 8cc9885adb..a14c8fd4c9 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -132,15 +132,20 @@ int64_t taosCopyFile(const char *from, const char *to) { if (bytes < sizeof(buffer)) break; } - taosFsyncFile(pFileTo); + int code = taosFsyncFile(pFileTo); taosCloseFile(&pFileFrom); taosCloseFile(&pFileTo); + + if (code != 0) { + return -1; + } return size; _err: if (pFileFrom != NULL) taosCloseFile(&pFileFrom); if (pFileTo != NULL) taosCloseFile(&pFileTo); + /* coverity[+retval] */ taosRemoveFile(to); return -1; #endif @@ -506,13 +511,13 @@ int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t } int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { + if (pFile == NULL || pFile->fd < 0) { + return -1; + } #if FILE_WITH_LOCK taosThreadRwlockRdlock(&(pFile->rwlock)); #endif ASSERT(pFile->fd >= 0); // Please check if you have closed the file. - if (pFile->fd < 0) { - return -1; - } #ifdef WINDOWS int64_t ret = _lseeki64(pFile->fd, offset, whence); #else diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index fac547ca99..7d2c8aa4e5 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -745,8 +745,10 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) { #endif serverAdd.sin_port = (uint16_t)htons(port); - if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { - // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno)); + fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (fd < 0) { // exception + return false; + } else if (fd <= 2) { // in, out, err taosCloseSocketNoCheck1(fd); return false; } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 6471dad033..52309a7b35 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -439,11 +439,14 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) { if (code != 0 && (done & 1) == 0) { TdFilePtr pFile1 = taosOpenFile("/proc/device-tree/model", TD_FILE_READ | TD_FILE_STREAM); - if (pFile1 == NULL) return code; - taosGetsFile(pFile1, maxLen, cpuModel); - taosCloseFile(&pFile1); - code = 0; - done |= 1; + if (pFile1 != NULL) { + ssize_t bytes = taosGetsFile(pFile1, maxLen, cpuModel); + taosCloseFile(&pFile); + if (bytes > 0) { + code = 0; + done |= 1; + } + } } if (code != 0 && (done & 1) == 0) { @@ -498,7 +501,7 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { curSysTotal = curSysUsed + sysCpu.idle; curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime; - if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) { + if (curSysTotal - lastSysTotal > 0 && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) { if (cpu_system != NULL) { *cpu_system = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; } @@ -610,12 +613,6 @@ int32_t taosGetProcMemory(int64_t *usedKB) { } } - if (strlen(line) < 0) { - // printf("read file:%s failed", tsProcMemFile); - taosCloseFile(&pFile); - return -1; - } - char tmp[10]; sscanf(line, "%s %" PRId64, tmp, usedKB); diff --git a/source/os/src/osTimezone.c b/source/os/src/osTimezone.c index ab5600744c..ad223bff27 100644 --- a/source/os/src/osTimezone.c +++ b/source/os/src/osTimezone.c @@ -909,7 +909,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) { char buf[4096] = {0}; char *tz = NULL; { - int n = readlink("/etc/localtime", buf, sizeof(buf)); + int n = readlink("/etc/localtime", buf, sizeof(buf)-1); if (n < 0) { printf("read /etc/localtime error, reason:%s", strerror(errno)); diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 54d31cdb74..85ed2a7ac7 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -845,6 +845,8 @@ void shellReadHistory() { i = (i + SHELL_MAX_HISTORY_SIZE - 1) % SHELL_MAX_HISTORY_SIZE; } taosFprintfFile(pFile, "%s\n", pHistory->hist[endIndex]); + + /* coverity[+retval] */ taosFsyncFile(pFile); taosCloseFile(&pFile); } From e667da16668ad88ef7db16cb70e941fab22abf62 Mon Sep 17 00:00:00 2001 From: Hui Li <52318143+plum-lihui@users.noreply.github.com> Date: Fri, 24 Feb 2023 15:35:47 +0800 Subject: [PATCH 10/12] Update subscribeDb4.py --- tests/system-test/7-tmq/subscribeDb4.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/system-test/7-tmq/subscribeDb4.py b/tests/system-test/7-tmq/subscribeDb4.py index 7f5169361c..c14d3b27b1 100644 --- a/tests/system-test/7-tmq/subscribeDb4.py +++ b/tests/system-test/7-tmq/subscribeDb4.py @@ -31,7 +31,8 @@ class TDTestCase: 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 20, 'showMsg': 1, - 'showRow': 1} + 'showRow': 1, + 'snapshot': 1} cdbName = 'cdb' # some parameter to consumer processor @@ -42,7 +43,7 @@ class TDTestCase: ifManualCommit = 1 groupId = 'group.id:cgrp1' autoCommit = 'enable.auto.commit:true' - autoCommitInterval = 'auto.commit.interval.ms:1000' + autoCommitInterval = 'auto.commit.interval.ms:100' autoOffset = 'auto.offset.reset:earliest' pollDelay = 20 @@ -86,7 +87,7 @@ class TDTestCase: tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit) tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) + tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName,0,0,self.paraDict["snapshot"]) tdLog.info("After waiting for a commit notify, drop one stable") #time.sleep(3) From fb8ab642cb7b49693fb716e70eed8d69e3f421ff Mon Sep 17 00:00:00 2001 From: Hui Li <52318143+plum-lihui@users.noreply.github.com> Date: Fri, 24 Feb 2023 15:37:21 +0800 Subject: [PATCH 11/12] Update tmqCommon.py --- tests/system-test/7-tmq/tmqCommon.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 4cda062401..895da95e5d 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -151,7 +151,7 @@ class TMQCom: if tdSql.getData(i, 1) == 0: loopFlag = 0 break - time.sleep(0.1) + time.sleep(0.02) return def getStartCommitNotifyFromTmqsim(self,cdbName='cdb',rows=2): @@ -165,7 +165,7 @@ class TMQCom: if tdSql.getData(i, 1) == 1: loopFlag = 0 break - time.sleep(0.1) + time.sleep(0.02) return def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): From d7e78005201c15f809e1fe7331b16cb196b04780 Mon Sep 17 00:00:00 2001 From: huolibo Date: Fri, 24 Feb 2023 15:44:12 +0800 Subject: [PATCH 12/12] docs: add jdbc 3.1.0 version description (#20145) * docs: add jdbc 3.1.0 version description * docs: fix jdbc description * docs: add tmq over websocket demo * docs: fix jdbc tmq demo path * docs: docs demo in CI * fix: add jdbc version * docs: use subscription instead of tmq --------- Co-authored-by: Shuduo Sang --- docs/en/07-develop/_sub_java.mdx | 35 ++++--- docs/en/14-reference/03-connector/04-java.mdx | 93 +++++++++++++++++- docs/examples/java/pom.xml | 11 ++- .../taos/example/WebsocketSubscribeDemo.java | 79 +++++++++++++++ .../src/test/java/com/taos/test/TestAll.java | 18 ++-- docs/zh/07-develop/_sub_java.mdx | 17 +++- docs/zh/08-connector/14-java.mdx | 95 ++++++++++++++++++- 7 files changed, 314 insertions(+), 34 deletions(-) create mode 100644 docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java diff --git a/docs/en/07-develop/_sub_java.mdx b/docs/en/07-develop/_sub_java.mdx index d14b5fd609..a928fa8836 100644 --- a/docs/en/07-develop/_sub_java.mdx +++ b/docs/en/07-develop/_sub_java.mdx @@ -1,11 +1,24 @@ -```java -{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}} -{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}} -{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} -``` -```java -{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}} -``` -```java -{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} -``` \ No newline at end of file + + + ```java + {{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}} + ``` + ```java + {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}} + ``` + ```java + {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} + ``` + + + ```java + {{#include docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java}} + ``` + ```java + {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}} + ``` + ```java + {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} + ``` + + diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index 61ce166069..36992da636 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -696,6 +696,9 @@ TaosConsumer consumer = new TaosConsumer<>(config); - enable.auto.commit: Specifies whether to commit automatically. - group.id: consumer: Specifies the group that the consumer is in. - value.deserializer: To deserialize the results, you can inherit `com.taosdata.jdbc.tmq.ReferenceDeserializer` and specify the result set bean. You can also inherit `com.taosdata.jdbc.tmq.Deserializer` and perform custom deserialization based on the SQL result set. +- td.connect.type: Specifies the type connect with TDengine, `jni` or `WebSocket`. default is `jni` +- httpConnectTimeout:WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type. +- messageWaitTimeout:socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type. - For more information, see [Consumer Parameters](../../../develop/tmq). #### Subscribe to consume data @@ -724,6 +727,11 @@ For more information, see [Data Subscription](../../../develop/tmq). ### Usage examples + + + +In addition to the native connection, the Java Connector also supports subscribing via websocket. + ```java public abstract class ConsumerLoop { private final TaosConsumer consumer; @@ -795,6 +803,87 @@ public abstract class ConsumerLoop { } ``` + + + +```java +public abstract class ConsumerLoop { + private final TaosConsumer consumer; + private final List topics; + private final AtomicBoolean shutdown; + private final CountDownLatch shutdownLatch; + + public ConsumerLoop() throws SQLException { + Properties config = new Properties(); + config.setProperty("bootstrap.servers", "localhost:6041"); + config.setProperty("td.connect.type", "ws"); + config.setProperty("msg.with.table.name", "true"); + config.setProperty("enable.auto.commit", "true"); + config.setProperty("group.id", "group2"); + config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer"); + + this.consumer = new TaosConsumer<>(config); + this.topics = Collections.singletonList("topic_speed"); + this.shutdown = new AtomicBoolean(false); + this.shutdownLatch = new CountDownLatch(1); + } + + public abstract void process(ResultBean result); + + public void pollData() throws SQLException { + try { + consumer.subscribe(topics); + + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ResultBean record : records) { + process(record); + } + } + consumer.unsubscribe(); + } finally { + consumer.close(); + shutdownLatch.countDown(); + } + } + + public void shutdown() throws InterruptedException { + shutdown.set(true); + shutdownLatch.await(); + } + + public static class ResultDeserializer extends ReferenceDeserializer { + + } + + public static class ResultBean { + private Timestamp ts; + private int speed; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public int getSpeed() { + return speed; + } + + public void setSpeed(int speed) { + this.speed = speed; + } + } +} +``` + + + + +> **Note**: The value of value.deserializer should be adjusted based on the package path of the test environment. + ### Use with connection pool #### HikariCP @@ -878,8 +967,8 @@ The source code of the sample application is under `TDengine/examples/JDBC`: | taos-jdbcdriver version | major changes | | :---------------------: | :--------------------------------------------: | -| 3.0.3 | fix timestamp resolution error for REST connection in jdk17+ version | -| 3.0.1 - 3.0.2 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use 3.0.2 in the JDK 8 environment | +| 3.1.0 | JDBC REST connection supports subscription over WebSocket | +| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | | 3.0.0 | Support for TDengine 3.0 | | 2.0.42 | fix wasNull interface return value in WebSocket connection | | 2.0.41 | fix decode method of username and password in REST connection | diff --git a/docs/examples/java/pom.xml b/docs/examples/java/pom.xml index 634c3f75a8..01d0ce936c 100644 --- a/docs/examples/java/pom.xml +++ b/docs/examples/java/pom.xml @@ -1,6 +1,7 @@ - 4.0.0 @@ -17,13 +18,13 @@ - + com.taosdata.jdbc taos-jdbcdriver - 3.0.0 + 3.1.0 - + junit junit @@ -32,4 +33,4 @@ - + \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java b/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java new file mode 100644 index 0000000000..d953a73641 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java @@ -0,0 +1,79 @@ +package com.taos.example; + +import com.taosdata.jdbc.tmq.ConsumerRecords; +import com.taosdata.jdbc.tmq.TMQConstants; +import com.taosdata.jdbc.tmq.TaosConsumer; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; + +public class WebsocketSubscribeDemo { + private static final String TOPIC = "tmq_topic_ws"; + private static final String DB_NAME = "meters_ws"; + private static final AtomicBoolean shutdown = new AtomicBoolean(false); + + public static void main(String[] args) { + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + public void run() { + shutdown.set(true); + } + }, 3_000); + try { + // prepare + Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); + String jdbcUrl = "jdbc:TAOS-RS://127.0.0.1:6041/?user=root&password=taosdata&batchfetch=true"; + try (Connection connection = DriverManager.getConnection(jdbcUrl); + Statement statement = connection.createStatement()) { + statement.executeUpdate("drop topic if exists " + TOPIC); + statement.executeUpdate("drop database if exists " + DB_NAME); + statement.executeUpdate("create database " + DB_NAME); + statement.executeUpdate("use " + DB_NAME); + statement.executeUpdate( + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))"); + statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')"); + statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)"); + statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)"); + statement.executeUpdate( + "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)"); + statement.executeUpdate( + "INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)"); + // create topic + statement.executeUpdate("create topic " + TOPIC + " as select * from meters"); + } + + // create consumer + Properties properties = new Properties(); + properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041"); + properties.setProperty(TMQConstants.CONNECT_TYPE, "ws"); + properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true"); + properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true"); + properties.setProperty(TMQConstants.GROUP_ID, "test"); + properties.setProperty(TMQConstants.VALUE_DESERIALIZER, + "com.taos.example.MetersDeserializer"); + + // poll data + try (TaosConsumer consumer = new TaosConsumer<>(properties)) { + consumer.subscribe(Collections.singletonList(TOPIC)); + while (!shutdown.get()) { + ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); + for (Meters meter : meters) { + System.out.println(meter); + } + } + consumer.unsubscribe(); + } + } catch (ClassNotFoundException | SQLException e) { + e.printStackTrace(); + } + timer.cancel(); + } +} diff --git a/docs/examples/java/src/test/java/com/taos/test/TestAll.java b/docs/examples/java/src/test/java/com/taos/test/TestAll.java index 8d201da074..f24156d8b1 100644 --- a/docs/examples/java/src/test/java/com/taos/test/TestAll.java +++ b/docs/examples/java/src/test/java/com/taos/test/TestAll.java @@ -64,21 +64,15 @@ public class TestAll { @Test public void testSubscribe() { - - Thread thread = new Thread(() -> { - try { - Thread.sleep(1000); - insertData(); - } catch (SQLException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - thread.start(); SubscribeDemo.main(args); } + + @Test + public void testSubscribeOverWebsocket() { + WebsocketSubscribeDemo.main(args); + } + @Test public void testSchemaless() throws SQLException { LineProtocolExample.main(args); diff --git a/docs/zh/07-develop/_sub_java.mdx b/docs/zh/07-develop/_sub_java.mdx index e7de158cc8..e5e46b2af0 100644 --- a/docs/zh/07-develop/_sub_java.mdx +++ b/docs/zh/07-develop/_sub_java.mdx @@ -1,3 +1,5 @@ + + ```java {{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}} ``` @@ -6,4 +8,17 @@ ``` ```java {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} -``` \ No newline at end of file +``` + + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java}} +``` +```java +{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}} +``` +```java +{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} +``` + + diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index fc6dc57138..061475f51e 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -699,7 +699,10 @@ TaosConsumer consumer = new TaosConsumer<>(config); - enable.auto.commit: 是否允许自动提交。 - group.id: consumer: 所在的 group。 - value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean,实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。 -- 其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group) +- td.connect.type: 连接方式。jni:表示使用动态库连接的方式,ws/WebSocket:表示使用 WebSocket 进行数据通信。默认为 jni 方式。 +- httpConnectTimeout:创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。 +- messageWaitTimeout:数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。 +其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group) #### 订阅消费数据 @@ -727,6 +730,9 @@ consumer.close() ### 使用示例如下: + + + ```java public abstract class ConsumerLoop { private final TaosConsumer consumer; @@ -798,6 +804,89 @@ public abstract class ConsumerLoop { } ``` + + + +除了原生的连接方式,Java 连接器还支持通过 WebSocket 订阅数据。 + +```java +public abstract class ConsumerLoop { + private final TaosConsumer consumer; + private final List topics; + private final AtomicBoolean shutdown; + private final CountDownLatch shutdownLatch; + + public ConsumerLoop() throws SQLException { + Properties config = new Properties(); + config.setProperty("bootstrap.servers", "localhost:6041"); + config.setProperty("td.connect.type", "ws"); + config.setProperty("msg.with.table.name", "true"); + config.setProperty("enable.auto.commit", "true"); + config.setProperty("group.id", "group2"); + config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer"); + + this.consumer = new TaosConsumer<>(config); + this.topics = Collections.singletonList("topic_speed"); + this.shutdown = new AtomicBoolean(false); + this.shutdownLatch = new CountDownLatch(1); + } + + public abstract void process(ResultBean result); + + public void pollData() throws SQLException { + try { + consumer.subscribe(topics); + + while (!shutdown.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ResultBean record : records) { + process(record); + } + } + consumer.unsubscribe(); + } finally { + consumer.close(); + shutdownLatch.countDown(); + } + } + + public void shutdown() throws InterruptedException { + shutdown.set(true); + shutdownLatch.await(); + } + + public static class ResultDeserializer extends ReferenceDeserializer { + + } + + public static class ResultBean { + private Timestamp ts; + private int speed; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public int getSpeed() { + return speed; + } + + public void setSpeed(int speed) { + this.speed = speed; + } + } +} +``` + + + + +> **注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。 + ### 与连接池使用 #### HikariCP @@ -881,8 +970,8 @@ public static void main(String[] args) throws Exception { | taos-jdbcdriver 版本 | 主要变化 | | :------------------: | :----------------------------: | -| 3.0.3 | 修复 REST 连接在 jdk17+ 版本时间戳解析错误问题 | -| 3.0.1 - 3.0.2 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用 3.0.2 版本 | +| 3.1.0 | WebSocket 连接支持订阅功能 | +| 3.0.1 - 3.0.4 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用其他版本 | | 3.0.0 | 支持 TDengine 3.0 | | 2.0.42 | 修在 WebSocket 连接中 wasNull 接口返回值 | | 2.0.41 | 修正 REST 连接中用户名和密码转码方式 |