From 980382d4334b6f2b11dc9dde59f7d1d44d3c980c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 11:18:08 +0800 Subject: [PATCH 01/24] fix(tsdb): fix memory error. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 22 ++++++++++-------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 1 + source/libs/executor/src/cachescanoperator.c | 24 ++++++++++---------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index dd5da28b6b..d5f3624851 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -57,7 +57,6 @@ static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, c static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; - // bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { uint64_t ts = TSKEY_MIN; @@ -108,11 +107,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p } } - // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it + // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it p->hasResult = true; varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE); colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false); } + for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); if (idx < funcTypeBlockArray->size) { @@ -233,6 +233,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, if (IS_VAR_DATA_TYPE(pPkCol->type)) { p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes); } + + p->pkColumn = *pPkCol; } if (numOfTables == 0) { @@ -366,15 +368,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - for (int32_t j = 0; j < pr->numOfCols; ++j) { - int32_t bytes; - if (slotIds[j] == -1) { - bytes = 1; - } else { - bytes = pr->pSchema->columns[slotIds[j]].bytes; - } + int32_t pkBufLen = 0; + if (pr->rowKey.numOfPKs > 0) { + pkBufLen = pr->pkColumn.bytes; + } - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); + for (int32_t j = 0; j < pr->numOfCols; ++j) { + int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes; + + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); p->ts = INT64_MIN; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index af7d00e019..bece22adad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -382,6 +382,7 @@ typedef struct SCacheRowsReader { SArray* pFuncTypeList; __compar_fn_t pkComparFn; SRowKey rowKey; + SColumnInfo pkColumn; } SCacheRowsReader; int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 56052434a4..23e873d335 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -40,7 +40,7 @@ typedef struct SCacheRowsScanInfo { SExprSupp pseudoExprSup; int32_t retrieveType; int32_t currentGroupIndex; - SSDataBlock* pBufferredRes; + SSDataBlock* pBufferedRes; SArray* pUidList; SArray* pCidList; int32_t indexOfBufferedRes; @@ -160,9 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe capacity = TMIN(totalTables, 4096); - pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); - setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode); - blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); + pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false); + setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode); + blockDataEnsureCapacity(pInfo->pBufferedRes, capacity); } else { // by tags pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); capacity = 1; // only one row output @@ -219,18 +219,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { - blockDataCleanup(pInfo->pBufferredRes); + if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) { + blockDataCleanup(pInfo->pBufferedRes); taosArrayClear(pInfo->pUidList); - int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, + int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } // check for tag values - int32_t resultRows = pInfo->pBufferredRes->info.rows; + int32_t resultRows = pInfo->pBufferedRes->info.rows; // the results may be null, if last values are all null ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); @@ -239,12 +239,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; - if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { + if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) { SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); int32_t slotId = pCol->info.slotId; - SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); + SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) { @@ -350,7 +350,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); - blockDataDestroy(pInfo->pBufferredRes); + blockDataDestroy(pInfo->pBufferedRes); taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pDstSlotIds); taosArrayDestroy(pInfo->pCidList); From e161556f5184f5a9f50297811e798701132242b3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Apr 2024 15:35:25 +0800 Subject: [PATCH 02/24] set ts column index for function --- include/libs/executor/storageapi.h | 2 +- include/libs/function/function.h | 8 +++++--- include/libs/stream/streamState.h | 2 +- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/scanoperator.c | 2 +- .../libs/executor/src/streamcountwindowoperator.c | 2 +- .../libs/executor/src/streameventwindowoperator.c | 2 +- .../libs/executor/src/streamtimewindowoperator.c | 14 +++++++------- source/libs/function/src/builtinsimpl.c | 5 +---- source/libs/stream/src/streamState.c | 5 ++++- tests/script/tsim/stream/basic5.sim | 15 ++++++++++----- 11 files changed, 33 insertions(+), 26 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index b47a162a1a..10697e64b2 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -328,7 +328,7 @@ typedef struct SStateStore { int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateClear)(SStreamState* pState); - void (*streamStateSetNumber)(SStreamState* pState, int32_t number); + void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex); int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 0fa84c99c6..a1074d6901 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -29,6 +29,7 @@ struct SResultRowEntryInfo; struct SFunctionNode; typedef struct SScalarParam SScalarParam; +typedef struct SStreamState SStreamState; typedef struct SFuncExecEnv { int32_t calcMemSize; @@ -126,7 +127,7 @@ typedef struct SInputColumnInfoData { typedef struct SSerializeDataHandle { struct SDiskbasedBuf *pBuf; int32_t currentPage; - void *pState; + SStreamState *pState; } SSerializeDataHandle; // incremental state storage @@ -164,7 +165,7 @@ typedef struct STdbState { void *txn; } STdbState; -typedef struct { +struct SStreamState { STdbState *pTdbState; struct SStreamFileState *pFileState; int32_t number; @@ -173,7 +174,8 @@ typedef struct { int64_t streamId; int64_t streamBackendRid; int8_t dump; -} SStreamState; + int32_t tsIndex; +}; typedef struct SFunctionStateStore { int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index c603f9f5ac..f1c9d712e8 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -46,7 +46,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key); int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateClear(SStreamState* pState); -void streamStateSetNumber(SStreamState* pState, int32_t number); +void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex); int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5263546765..628311591a 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -897,7 +897,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi); + SStorageAPI* pApi, int32_t tsIndex); void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, STimeWindowAggSupp* pTwSup); void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a6613e589a..dc5ce60f95 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2903,7 +2903,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateInfo = NULL; pInfo->pTableScanOp = pTableScanOp; if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) { - pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1, pInfo->primaryTsIndex); } pInfo->readHandle = *pHandle; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 720734431f..db06775a18 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -659,7 +659,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index ef5c2572d9..c948b57534 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -711,7 +711,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 51071e2b4a..86d428cddf 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1522,7 +1522,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); - pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -1705,7 +1705,7 @@ static TSKEY sesionTs(void* pKey) { int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi) { + SStorageAPI* pApi, int32_t tsIndex) { pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; @@ -1721,7 +1721,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pSup->pState) = *pState; - pSup->stateStore.streamStateSetNumber(pSup->pState, -1); + pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, @@ -2950,7 +2950,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3175,7 +3175,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream } SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info; pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i); + pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex); taosArrayPush(pInfo->pChildren, &pChildOp); } } @@ -3845,7 +3845,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys int16_t type = pColNode->node.resType.type; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, - GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4082,7 +4082,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b709abc30b..5a792f6139 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3203,10 +3203,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* SWinKey key = {0}; if (pCtx->saveHandle.pBuf == NULL) { - SColumnInfoData* pColInfo = pCtx->input.pPTS; - if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) { - pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); - } + SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex); ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); key.groupId = pSrcBlock->info.id.groupId; key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index b53dc9daa6..18bc672c8d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -395,7 +395,10 @@ int32_t streamStateClear(SStreamState* pState) { #endif } -void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } +void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) { + pState->number = number; + pState->tsIndex = tsIdex; +} int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) { #ifdef USE_ROCKSDB diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim index f507ab7d3b..7b5f587feb 100644 --- a/tests/script/tsim/stream/basic5.sim +++ b/tests/script/tsim/stream/basic5.sim @@ -372,17 +372,22 @@ print step4============= sql create database test6 vgroups 4; sql use test6; -sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int); +sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s); +sql create stream streams7 trigger at_once ignore expired 0 ignore update 0 into streamt7 as select ts, max(c) from st interval(1s); +sql create stream streams8 trigger at_once ignore expired 0 ignore update 0 into streamt8 as select ts, b, c, last(c), ta, tb from st session(ts, 1s); +sql create stream streams9 trigger at_once ignore expired 0 ignore update 0 into streamt9 as select ts, b, c, last_row(c), ta, tb from st partition by tbname state_window(a); +sql create stream streams10 trigger at_once ignore expired 0 ignore update 0 into streamt10 as select ts, b, c, last(c), ta, tb from st partition by tbname event_window start with d = 0 end with d = 9; +sql create stream streams11 trigger at_once ignore expired 1 ignore update 0 watermark 100s into streamt11 as select ts, b, c, last(c), ta, tb from st partition by tbname count_window(2); sleep 1000 -sql insert into t1 values(1648791211000,1,2,3,1.0); -sql insert into t1 values(1648791213000,2,3,4,1.1); -sql insert into t2 values(1648791215000,3,4,5,1.1); -sql insert into t2 values(1648791217000,4,5,6,1.1); +sql insert into t1 values(1648791211000,1,2,3,0); +sql insert into t1 values(1648791213000,2,3,4,0); +sql insert into t2 values(1648791215000,3,4,5,0); +sql insert into t2 values(1648791217000,4,5,6,0); $loop_count = 0 From f23a8a37bc3f157b85870066f5f6f2e8ee2299eb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 16:43:02 +0800 Subject: [PATCH 03/24] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 1 - source/dnode/snode/inc/sndInt.h | 4 ++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 9 +++++++++ source/libs/stream/src/streamMeta.c | 2 -- source/libs/stream/src/streamStart.c | 3 +-- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 138fad0ddb..c12bb146b4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -516,7 +516,6 @@ typedef struct SStreamMeta { TdThreadMutex backendMutex; SMetaHbInfo* pHbInfo; STaskUpdateInfo updateInfo; - SHashObj* pUpdateTaskSet; int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfPausedTasks; int64_t rid; diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 68f7f756d5..024c3c6bae 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -30,11 +30,11 @@ extern "C" { #endif -typedef struct SSnode { +struct SSnode { char* path; SStreamMeta* pMeta; SMsgCb msgCb; -} SSnode; +}; #if 0 typedef struct { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 0f7f74f78b..2fa9f9a9ff 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -807,6 +807,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { STaskStartInfo* pStartInfo = &pMeta->startInfo; int32_t vgId = pMeta->vgId; + bool scanWal = false; streamMetaWLock(pMeta); if (pStartInfo->taskStarting == 1) { @@ -831,10 +832,18 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { pStartInfo->restartCount = 0; tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId); } + + scanWal = true; } } streamMetaWUnLock(pMeta); + + if (scanWal && (vgId != SNODE_HANDLE)) { + tqDebug("vgId:%d start scan wal for executing tasks", vgId); + tqScanWalAsync(pMeta->ahandle, true); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index aae3594905..8d5e4f3c87 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -304,7 +304,6 @@ void streamMetaRemoveDB(void* arg, char* key) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn) { - int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -516,7 +515,6 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskDbUnique); - taosHashCleanup(pMeta->pUpdateTaskSet); taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 0161f382ba..f2a694a554 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -398,8 +398,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { if (pTask->status.taskStatus == TASK_STATUS__HALT) { ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); - // halt it self for count window stream task until the related - // fill history task completd. + // halt it self for count window stream task until the related fill history task completed. stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); From 8a5532dd8811de75adf9053e995c058d71f7110f Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 10 Apr 2024 16:48:02 +0800 Subject: [PATCH 04/24] add cfg value check --- include/util/tunit.h | 4 +- include/util/tutil.h | 2 + source/util/src/tconfig.c | 33 ++++--- source/util/src/tunit.c | 141 ++++++++++++++++------------- source/util/src/tutil.c | 18 ++++ tests/system-test/0-others/show.py | 43 +++++++++ 6 files changed, 163 insertions(+), 78 deletions(-) diff --git a/include/util/tunit.h b/include/util/tunit.h index de37c85929..207431fa7d 100644 --- a/include/util/tunit.h +++ b/include/util/tunit.h @@ -22,10 +22,10 @@ extern "C" { #endif -int64_t taosStrHumanToInt64(const char* str); +int32_t taosStrHumanToInt64(const char* str, int64_t* out); void taosInt64ToHumanStr(int64_t val, char* outStr); -int32_t taosStrHumanToInt32(const char* str); +int32_t taosStrHumanToInt32(const char* str, int32_t* out); void taosInt32ToHumanStr(int32_t val, char* outStr); #ifdef __cplusplus diff --git a/include/util/tutil.h b/include/util/tutil.h index de2cd205f2..54ce6fc849 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -56,6 +56,8 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); +int32_t parseCfgReal(const char* str, double* out); + static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { T_MD5_CTX context; tMD5Init(&context); diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 8fdc2654c5..caca123777 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -174,7 +174,9 @@ static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType sty } static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - int32_t ival = taosStrHumanToInt32(value); + int32_t ival; + int32_t code = taosStrHumanToInt32(value, &ival); + if (code != TSDB_CODE_SUCCESS) return code; if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax); @@ -188,7 +190,9 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st } static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - int64_t ival = taosStrHumanToInt64(value); + int64_t ival; + int32_t code = taosStrHumanToInt64(value, &ival); + if (code != TSDB_CODE_SUCCESS) return code; if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax); @@ -202,15 +206,16 @@ static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType st } static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - float fval = (float)atof(value); - if (fval < pItem->fmin || fval > pItem->fmax) { + double dval; + int32_t code = parseCfgReal(value, &dval); + if (dval < pItem->fmin || dval > pItem->fmax) { uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), - cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax); + cfgStypeStr(stype), dval, pItem->fmin, pItem->fmax); terrno = TSDB_CODE_OUT_OF_RANGE; return -1; } - pItem->fval = fval; + pItem->fval = (float)dval; pItem->stype = stype; return 0; } @@ -408,7 +413,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p } } break; case CFG_DTYPE_INT32: { - int32_t ival = (int32_t)taosStrHumanToInt32(pVal); + int32_t ival; + int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival); + if (code != TSDB_CODE_SUCCESS) return code; if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax); @@ -417,7 +424,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p } } break; case CFG_DTYPE_INT64: { - int64_t ival = (int64_t)taosStrHumanToInt64(pVal); + int64_t ival; + int32_t code = taosStrHumanToInt64(pVal, &ival); + if (code != TSDB_CODE_SUCCESS) return code; if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax); @@ -427,9 +436,11 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p } break; case CFG_DTYPE_FLOAT: case CFG_DTYPE_DOUBLE: { - float fval = (float)atof(pVal); - if (fval < pItem->fmin || fval > pItem->fmax) { - uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), fval, + double dval; + int32_t code = parseCfgReal(pVal, &dval); + if (code != TSDB_CODE_SUCCESS) return code; + if (dval < pItem->fmin || dval > pItem->fmax) { + uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), dval, pItem->fmin, pItem->fmax); terrno = TSDB_CODE_OUT_OF_RANGE; return -1; diff --git a/source/util/src/tunit.c b/source/util/src/tunit.c index 378f23613a..09f59f1e40 100644 --- a/source/util/src/tunit.c +++ b/source/util/src/tunit.c @@ -23,45 +23,74 @@ #define UNIT_ONE_PEBIBYTE (UNIT_ONE_TEBIBYTE * UNIT_SIZE_CONVERT_FACTOR) #define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR) -int64_t taosStrHumanToInt64(const char* str) { - size_t sLen = strlen(str); - if (sLen < 2) return atoll(str); - - int64_t val = 0; - - char* strNoUnit = NULL; - char unit = str[sLen - 1]; - if ((unit == 'P') || (unit == 'p')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_PEBIBYTE; - } else if ((unit == 'T') || (unit == 't')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_TEBIBYTE; - } else if ((unit == 'G') || (unit == 'g')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE; - } else if ((unit == 'M') || (unit == 'm')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE; - } else if ((unit == 'K') || (unit == 'k')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE; - } else { - val = atoll(str); +static int32_t parseCfgIntWithUnit(const char* str, double *res) { + double val, temp = INT64_MAX; + char* endPtr; + errno = 0; + val = taosStr2Int64(str, &endPtr, 0); + if (*endPtr == '.' || errno == ERANGE) { + errno = 0; + val = taosStr2Double(str, &endPtr); } + if (endPtr == str || errno == ERANGE || isnan(val)) { + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + while (isspace((unsigned char)*endPtr)) endPtr++; + uint64_t factor = 1; + if (*endPtr != '\0') { + switch (*endPtr) { + case 'P': + case 'p': { + temp /= UNIT_ONE_PEBIBYTE; + factor = UNIT_ONE_PEBIBYTE; + } break; + case 'T': + case 't': { + temp /= UNIT_ONE_TEBIBYTE; + factor = UNIT_ONE_TEBIBYTE; + } break; + case 'G': + case 'g': { + temp /= UNIT_ONE_GIBIBYTE; + factor = UNIT_ONE_GIBIBYTE; + } break; + case 'M': + case 'm': { + temp /= UNIT_ONE_MEBIBYTE; + factor = UNIT_ONE_MEBIBYTE; + } break; + case 'K': + case 'k': { + temp /= UNIT_ONE_KIBIBYTE; + factor = UNIT_ONE_KIBIBYTE; + } break; + default: + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + if ((val > 0 && val > temp) || (val < 0 && val < -temp)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + endPtr++; + val *= factor; + } + while (isspace((unsigned char)*endPtr)) endPtr++; + if (*endPtr) { + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + val = rint(val); + *res = val; + return TSDB_CODE_SUCCESS; +} - taosMemoryFree(strNoUnit); - return val; +int32_t taosStrHumanToInt64(const char* str, int64_t *out) { + double res; + int32_t code = parseCfgIntWithUnit(str, &res); + if (code == TSDB_CODE_SUCCESS) *out = (int64_t)res; + return code; } #ifdef BUILD_NO_CALL @@ -83,35 +112,17 @@ void taosInt64ToHumanStr(int64_t val, char* outStr) { } #endif -int32_t taosStrHumanToInt32(const char* str) { - size_t sLen = strlen(str); - if (sLen < 2) return atoll(str); - - int32_t val = 0; - - char* strNoUnit = NULL; - char unit = str[sLen - 1]; - if ((unit == 'G') || (unit == 'g')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE; - } else if ((unit == 'M') || (unit == 'm')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE; - } else if ((unit == 'K') || (unit == 'k')) { - strNoUnit = taosMemoryCalloc(sLen, 1); - memcpy(strNoUnit, str, sLen - 1); - - val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE; - } else { - val = atoll(str); +int32_t taosStrHumanToInt32(const char* str, int32_t* out) { + double res; + int32_t code = parseCfgIntWithUnit(str, &res); + if (code == TSDB_CODE_SUCCESS) { + if (res < INT32_MIN || res > INT32_MAX) { + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + *out = (int32_t)res; } - - taosMemoryFree(strNoUnit); - return val; + return code; } #ifdef BUILD_NO_CALL diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 6b6878ec83..f201edcb5e 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -496,3 +496,21 @@ size_t twcsncspn(const TdUcs4 *wcs, size_t size, const TdUcs4 *reject, size_t rs return index; } + +int32_t parseCfgReal(const char* str, double* out) { + double val; + char *endPtr; + errno = 0; + val = taosStr2Double(str, &endPtr); + if (str == endPtr || errno == ERANGE || isnan(val)) { + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + while(isspace((unsigned char)*endPtr)) endPtr++; + if (*endPtr != '\0') { + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + *out = val; + return TSDB_CODE_SUCCESS; +} diff --git a/tests/system-test/0-others/show.py b/tests/system-test/0-others/show.py index 75d7116e03..bc1239fae8 100644 --- a/tests/system-test/0-others/show.py +++ b/tests/system-test/0-others/show.py @@ -240,6 +240,49 @@ class TDTestCase: self.show_create_sysdb_sql() self.show_create_systb_sql() self.show_column_name() + self.test_show_variables() + + def get_variable(self, name: str, local: bool = True): + if local: + sql = 'show local variables' + else: + sql = f'select `value` from information_schema.ins_dnode_variables where name like "{name}"' + tdSql.query(sql, queryTimes=1) + res = tdSql.queryResult + if local: + for row in res: + if row[0] == name: + return row[1] + else: + if len(res) > 0: + return res[0][0] + raise Exception(f"variable {name} not found") + + def test_show_variables(self): + epsion = 0.0000001 + var = 'minimalTmpDirGB' + expect_val: float = 10.11 + sql = f'ALTER LOCAL "{var}" "{expect_val}"' + tdSql.execute(sql) + val: float = float(self.get_variable(var)) + if val != expect_val: + tdLog.exit(f'failed to set local {var} to {expect_val} actually {val}') + + error_vals = ['a', '10a', '', '1.100r', '1.12 r'] + for error_val in error_vals: + tdSql.error(f'ALTER LOCAL "{var}" "{error_val}"') + + var = 'supportVnodes' + expect_val = 1240 ## 1.211111 * 1024 + sql = f'ALTER DNODE 1 "{var}" "1.211111k"' + tdSql.execute(sql, queryTimes=1) + val = int(self.get_variable(var, False)) + if val != expect_val: + tdLog.exit(f'failed to set dnode {var} to {expect_val} actually {val}') + + error_vals = ['a', '10a', '', '1.100r', '1.12 r', '5k'] + for error_val in error_vals: + tdSql.error(f'ALTER DNODE 1 "{var}" "{error_val}"') def stop(self): tdSql.close() From c6d492d3fd3c2507d2a5d13e87bf6f4ddda48309 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Apr 2024 17:41:59 +0800 Subject: [PATCH 05/24] set ts column index for function --- include/libs/stream/tstreamFileState.h | 9 +++++-- .../executor/src/streamcountwindowoperator.c | 3 +-- .../executor/src/streameventwindowoperator.c | 2 +- .../executor/src/streamtimewindowoperator.c | 24 +++---------------- source/libs/stream/src/streamSessionState.c | 7 ++++++ source/libs/stream/src/streamState.c | 12 +++++----- source/libs/stream/src/tstreamFileState.c | 10 +++++++- 7 files changed, 34 insertions(+), 33 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index a9a198d194..68b9c4baa2 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -31,7 +31,6 @@ typedef struct SStreamFileState SStreamFileState; typedef SList SStreamSnapshot; typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); -typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); @@ -41,6 +40,8 @@ typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const voi typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen); typedef int32_t (*_state_file_clear_fn)(SStreamState* pState); +typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); + typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2); SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, @@ -64,7 +65,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); -int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); +int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); void* getRowStateBuff(SStreamFileState* pFileState); @@ -105,6 +106,10 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); +//function +int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); +int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index db06775a18..224808b41a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -657,6 +657,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -675,8 +676,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; - pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index c948b57534..eef002ff35 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -709,6 +709,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -716,7 +717,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } - pInfo->primaryTsIndex = tsSlotId; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 86d428cddf..98b04af525 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1730,25 +1730,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); - int32_t pageSize = 4096; - while (pageSize < pSup->resultRowSize * 4) { - pageSize <<= 1u; - } - // at least four pages need to be in buffer - int32_t bufSize = 4096 * 256; - if (bufSize <= pageSize) { - bufSize = pageSize * 4; - } - - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_DISKSPACE; - qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir); - return terrno; - } - - int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { - pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf; + pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } pSup->pSessionAPI = pApi; @@ -2948,6 +2931,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh .deleteMark = getDeleteMark(&pSessionNode->window, 0), }; + pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -2957,7 +2941,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; if (pSessionNode->window.pTsEnd) { pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId; } @@ -3843,14 +3826,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys } int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int16_t type = pColNode->node.resType.type; + pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } - - pInfo->primaryTsIndex = tsSlotId; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 295132a4f5..687b4bcf12 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -202,6 +202,13 @@ _end: return code; } +int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + SWinKey* pTmpkey = pKey; + ASSERT(keyLen == sizeof(SWinKey)); + SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts}; + return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen); +} + int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SSessionKey* pKey = pPos->pKey; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 18bc672c8d..1f46384448 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -277,10 +277,10 @@ int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + int32_t len = getRowStateRowSize(pState->pFileState); + int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); return code; #else @@ -290,10 +290,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + int32_t len = getRowStateRowSize(pState->pFileState); + int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; return code; #else diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f86ab6b8a3..19f403a6a6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -58,6 +58,8 @@ struct SStreamFileState { _state_file_remove_fn stateFileRemoveFn; _state_file_get_fn stateFileGetFn; _state_file_clear_fn stateFileClearFn; + + _state_fun_get_fn stateFunctionGetFn; }; typedef SRowBuffPos SRowBuffInfo; @@ -157,6 +159,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileGetFn = intervalFileGetFn; pFileState->stateFileClearFn = streamStateClear_rocksdb; pFileState->cfName = taosStrdup("state"); + pFileState->stateFunctionGetFn = getRowBuff; } else { pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = sessionWinStateCleanup; @@ -168,6 +171,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileGetFn = sessionFileGetFn; pFileState->stateFileClearFn = streamStateSessionClear_rocksdb; pFileState->cfName = taosStrdup("sess"); + pFileState->stateFunctionGetFn = getSessionRowBuff; } if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { @@ -736,7 +740,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { return TSDB_CODE_SUCCESS; } -int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } +int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { pFileState->flushMark = TMAX(pFileState->flushMark, ts); @@ -754,3 +758,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; } + +int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen); +} From 9811a9e2164a77e2bd7a84b68f349d9f0a7fee05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 18:06:58 +0800 Subject: [PATCH 06/24] fix(tsdb): deep copy the pk for varchar type. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 52 ++++++++++++++------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index e8b1f870c3..a14f866bcc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -53,6 +53,13 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, return pLoadInfo; } +static void freeItem(void* pValue) { + SValue* p = (SValue*) pValue; + if (IS_VAR_DATA_TYPE(p->type)) { + taosMemoryFree(p->pData); + } +} + void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo == NULL) { return NULL; @@ -72,8 +79,8 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo->info.pCount != NULL) { taosArrayDestroy(pLoadInfo->info.pUid); - taosArrayDestroy(pLoadInfo->info.pFirstKey); - taosArrayDestroy(pLoadInfo->info.pLastKey); + taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem); + taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem); taosArrayDestroy(pLoadInfo->info.pCount); taosArrayDestroy(pLoadInfo->info.pFirstTs); taosArrayDestroy(pLoadInfo->info.pLastTs); @@ -319,6 +326,21 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray return TSDB_CODE_SUCCESS; } +static int32_t tValueDupPayload(SValue *pVal) { + if (IS_VAR_DATA_TYPE(pVal->type)) { + char *p = (char *)pVal->pData; + char *pBuf = taosMemoryMalloc(pVal->nData); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(pBuf, p, pVal->nData); + pVal->pData = (uint8_t *)pBuf; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) { int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray); @@ -384,25 +406,16 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl break; } - if (IS_VAR_DATA_TYPE(vFirst.type)) { - char *p = (char *)vFirst.pData; - char *pBuf = taosMemoryMalloc(vFirst.nData); - memcpy(pBuf, p, vFirst.nData); - vFirst.pData = (uint8_t *)pBuf; - } + tValueDupPayload(&vFirst); taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); + // todo add api to clone the original data code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); if (code) { break; } - if (IS_VAR_DATA_TYPE(vLast.type)) { - char *p = (char *)vLast.pData; - char *pBuf = taosMemoryMalloc(vLast.nData); - memcpy(pBuf, p, vLast.nData); - vLast.pData = (uint8_t *)pBuf; - } + tValueDupPayload(&vLast); taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); } @@ -420,8 +433,15 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts); taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.pks[0]); - taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.pks[0]); + SValue s = record.firstKey.pks[0]; + tValueDupPayload(&s); + + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s); + + s = record.lastKey.pks[0]; + tValueDupPayload(&s); + + taosArrayPush(pBlockLoadInfo->info.pLastKey, &s); i += 1; } } From b9581548c6121adcf89ee101715507e483d7723d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Apr 2024 18:20:56 +0800 Subject: [PATCH 07/24] adj msg --- source/libs/parser/src/parTranslater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 732920a3ce..74c9338fed 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8221,7 +8221,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList** ppC if (TSDB_CODE_SUCCESS == code && !hasPrimaryKey && hasPkInTable(pMeta)) { code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, - "Primary key column of dest table can not be null"); + "Primary key column name must be defined in existed-stable field"); } SNodeList* pNewProjections = NULL; From f0beceb5ebde5528f32998bbd725843b780f0ed7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 18:32:08 +0800 Subject: [PATCH 08/24] fix(tsdb): check for the duplicated ts in delete-skyline. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 44 +++++++++++++++++++++---- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a8a4ced517..c08face243 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -59,7 +59,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, - SVersionRange* pVerRange); + SVersionRange* pVerRange, bool hasPk); static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow); @@ -1595,7 +1595,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc); if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) { - if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) { + if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange, + pSttBlockReader->numOfPks > 0)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; return true; } @@ -2135,7 +2136,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) { bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order, - &pInfo->verRange); + &pInfo->verRange, pReader->suppInfo.numOfPks > 0); if (dropped) { return false; } @@ -3381,8 +3382,35 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ return (SVersionRange){.minVer = startVer, .maxVer = endVer}; } +static int32_t reverseSearchStartPos(const SArray* pDelList, int32_t index, int64_t key, bool asc) { + size_t num = taosArrayGetSize(pDelList); + int32_t start = index; + + if (asc) { + if (start >= num - 1) { + start = num - 1; + } + + TSDBKEY* p = taosArrayGet(pDelList, start); + while (p->ts >= key && start > 0) { + start -= 1; + } + } else { + if (index <= 0) { + start = 0; + } + + TSDBKEY* p = taosArrayGet(pDelList, start); + while (p->ts <= key && start < num - 1) { + start += 1; + } + } + + return start; +} + bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, - SVersionRange* pVerRange) { + SVersionRange* pVerRange, bool hasPk) { if (pDelList == NULL || (TARRAY_SIZE(pDelList) == 0)) { return false; } @@ -3391,6 +3419,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t bool asc = ASCENDING_TRAVERSE(order); int32_t step = asc ? 1 : -1; + if (hasPk) { // handle the case where duplicated timestamps existed. + *index = reverseSearchStartPos(pDelList, *index, key, asc); + } + if (asc) { if (*index >= num - 1) { TSDBKEY* last = taosArrayGetLast(pDelList); @@ -3503,7 +3535,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { return pRow; } else { - bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange); + bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { return pRow; } @@ -3528,7 +3560,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { return pRow; } else { - bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange); + bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { return pRow; } From 7f11a3682bf80390e0f012afcf103a627c1af2a9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 10 Apr 2024 18:41:20 +0800 Subject: [PATCH 09/24] enh: primary key column should not be null --- source/common/src/tdataformat.c | 22 +++++++++++++--------- source/libs/executor/src/dataInserter.c | 11 +---------- source/libs/parser/src/parInsertSql.c | 3 --- source/libs/parser/src/parInsertStmt.c | 10 ---------- 4 files changed, 14 insertions(+), 32 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f13a0a0825..f8d2da0bd5 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -101,16 +101,18 @@ typedef struct { int32_t kvRowSize; } SRowBuildScanInfo; -static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); +static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; sinfo->numOfNone++; + return 0; } -static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); +static FORCE_INLINE int32_t tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; sinfo->numOfNull++; sinfo->kvMaxOffset = sinfo->kvPayloadSize; sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId); + return 0; } static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) { @@ -142,6 +144,7 @@ static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal } static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) { + int32_t code = 0; int32_t colValIndex = 1; int32_t numOfColVals = TARRAY_SIZE(colVals); SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals); @@ -158,7 +161,7 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS for (int32_t i = 1; i < schema->numOfCols; i++) { for (;;) { if (colValIndex >= numOfColVals) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; break; } @@ -168,15 +171,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i); } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { - tRowBuildScanAddNull(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNull(sinfo, schema->columns + i))) goto _exit; } else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; } colValIndex++; break; } else if (colValArray[colValIndex].cid > schema->columns[i].colId) { - tRowBuildScanAddNone(sinfo, schema->columns + i); + if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit; break; } else { // skip useless value colValIndex++; @@ -250,7 +253,8 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS + sinfo->kvIndexSize // index array + sinfo->kvPayloadSize; // payload - return 0; +_exit: + return code; } static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema, diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 06f63f5f04..45d6f55278 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -216,11 +216,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { - if ((pCol->flags & COL_IS_KEY)) { - qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); - terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; - goto _end; - } + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); } else { @@ -248,11 +244,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL; goto _end; } - if ((pCol->flags & COL_IS_KEY)) { - qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); - terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; - goto _end; - } SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type taosArrayPush(pVals, &cv); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index a1c257022a..f3192b4956 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1657,9 +1657,6 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z); } - if (pSchema->flags & COL_IS_KEY) { - return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z); - } pVal->flag = CV_FLAG_NULL; return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 59c5ce82ad..bdeb548bd7 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -267,11 +267,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in pBind = bind + c; } - if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){ - code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null"); - goto _return; - } - code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1); if (code) { goto _return; @@ -318,11 +313,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu pBind = bind; } - if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) { - code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null"); - goto _return; - } - tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1); qDebug("stmt col %d bind %d rows data", colIdx, rowNum); From e261023ee60eb5d03a71196f56c25517047e0615 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 18:42:44 +0800 Subject: [PATCH 10/24] fix(stream): add lock, and fix race condition. --- source/dnode/vnode/src/tq/tqUtil.c | 6 ++++++ source/libs/stream/src/streamTask.c | 11 ++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 6029575e2c..d8440e996f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -501,6 +501,10 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b } // extract the required source task for a given stream, identified by streamId + streamMetaRLock(pMeta); + + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + for (int32_t i = 0; i < numOfTasks; ++i) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); if (pId->streamId != streamId) { @@ -552,5 +556,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b walCloseReader(pReader); } + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 44f70f8b19..88c8c85dec 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -733,15 +733,12 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) { bool streamTaskSetSchedStatusWait(SStreamTask* pTask) { bool ret = false; - // double check + taosThreadMutexLock(&pTask->lock); if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { - taosThreadMutexLock(&pTask->lock); - if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { - pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; - ret = true; - } - taosThreadMutexUnlock(&pTask->lock); + pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; + ret = true; } + taosThreadMutexUnlock(&pTask->lock); return ret; } From 507e40ddb7485e8b01a15c9961a49856dc88301c Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 10 Apr 2024 18:56:55 +0800 Subject: [PATCH 11/24] enh: primary key column should not be null --- source/libs/executor/src/dataInserter.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 45d6f55278..39bbc1bc69 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -216,7 +216,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { - SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); } else { From fabb986aa8cff6a5cba7e3685c8269c02eeaf70a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 19:54:33 +0800 Subject: [PATCH 12/24] fix(stream): fix stack overflow, caused by print epset. --- include/common/tmisce.h | 16 +---------- source/common/src/tmisce.c | 31 +++++++++++++++++++++ source/dnode/mnode/impl/src/mndStream.c | 5 ++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 6 ++-- source/libs/stream/src/streamTask.c | 6 ++-- source/libs/transport/src/transCli.c | 4 +-- 6 files changed, 43 insertions(+), 25 deletions(-) diff --git a/include/common/tmisce.h b/include/common/tmisce.h index afb33c733a..267ca814d4 100644 --- a/include/common/tmisce.h +++ b/include/common/tmisce.h @@ -29,21 +29,7 @@ typedef struct SCorEpSet { #define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse])) -#define EPSET_TO_STR(_eps, tbuf) \ - do { \ - int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \ - for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \ - if (_i == (_eps)->numOfEps - 1) { \ - len += \ - snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \ - } else { \ - len += \ - snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \ - } \ - } \ - len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \ - } while (0); - +int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t len); int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp); void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 1606b45eed..8558ccb447 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -70,6 +70,7 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) { tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn)); } } + void epAssign(SEp* pDst, SEp* pSrc) { if (pSrc == NULL || pDst == NULL) { return; @@ -78,6 +79,7 @@ void epAssign(SEp* pDst, SEp* pSrc) { tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn)); pDst->port = pSrc->port; } + void epsetSort(SEpSet* pDst) { if (pDst->numOfEps <= 1) { return; @@ -127,6 +129,35 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) { return ep; } +int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) { + int len = snprintf(pBuf, bufLen, "epset:{"); + if (len < 0) { + return -1; + } + + for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) { + int32_t ret = 0; + + if (_i == pEpSet->numOfEps - 1) { + ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port); + } else { + ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d, ", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port); + } + + if (ret < 0) { + return -1; + } + + len += ret; + } + + if (len < bufLen) { + /*len += */snprintf(pBuf + len, bufLen - len, "}, inUse:%d", pEpSet->inUse); + } + + return TSDB_CODE_SUCCESS; +} + int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) { SJson* pJson = tjsonCreateObject(); if (pJson == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ff05db417e..8f9afb2adc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1747,7 +1747,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); char buf[256] = {0}; - EPSET_TO_STR(&pCurrent->epset, buf); + epsetToStr(&pCurrent->epset, buf, tListLen(buf)); + mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId, pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated); @@ -1898,7 +1899,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { taosArrayPush(plist, pEntry); char buf[256] = {0}; - EPSET_TO_STR(&pEntry->epset, buf); + epsetToStr(&pEntry->epset, buf, tListLen(buf)); mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf); } taosHashCleanup(pHash); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index a124b4052c..d5bc12f9df 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -114,7 +114,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { } char buf[256] = {0}; - EPSET_TO_STR(&entry.epset, buf); + epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); taosArrayPush(pVgroupListSnapshot, &entry); @@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { entry.nodeId = SNODE_HANDLE; char buf[256] = {0}; - EPSET_TO_STR(&entry.epset, buf); + epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); taosArrayPush(pVgroupListSnapshot, &entry); sdbRelease(pSdb, pObj); @@ -302,7 +302,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } char buf[256] = {0}; - EPSET_TO_STR(&epset, buf); + epsetToStr(&epset, buf, tListLen(buf)); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 88c8c85dec..c34e162326 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -35,7 +35,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp if (pTask->info.nodeId == nodeId) { // execution task should be moved away epsetAssign(&pTask->info.epSet, pEpSet); - EPSET_TO_STR(pEpSet, buf) + epsetToStr(pEpSet, buf, tListLen(buf)); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); } @@ -592,7 +592,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { char buf[512] = {0}; - EPSET_TO_STR(pEpSet, buf); + epsetToStr(pEpSet, buf, tListLen(buf)); int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < numOfUpstream; ++i) { @@ -626,7 +626,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { char buf[512] = {0}; - EPSET_TO_STR(pEpSet, buf); + epsetToStr(pEpSet, buf, tListLen(buf)); int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 062609baac..79699a755a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2188,7 +2188,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { STransConnCtx* pCtx = pMsg->ctx; STraceId* trace = &pMsg->msg.info.traceId; char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); + epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, pCtx->retryNextInterval); return; @@ -2421,7 +2421,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (hasEpSet) { if (rpcDebugFlag & DEBUG_TRACE) { char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); + epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } } From 917487b1103533b3e86e39ce662163f95a89d8b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 19:55:31 +0800 Subject: [PATCH 13/24] refactor: do some internal refactor. --- source/common/src/tmisce.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 8558ccb447..77dd8344b1 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -137,7 +137,6 @@ int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) { for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) { int32_t ret = 0; - if (_i == pEpSet->numOfEps - 1) { ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port); } else { From bff8226c0f6ef56860ce10c305ad5b5b2c69efa9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 11 Apr 2024 08:27:17 +0800 Subject: [PATCH 14/24] enh: none column for primary key --- include/util/taoserror.h | 1 + source/common/src/tdataformat.c | 2 +- source/util/src/terror.c | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2389079fd2..a3cae6a7db 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -767,6 +767,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673) #define TSDB_CODE_PAR_INVALID_PK_OP TAOS_DEF_ERROR_CODE(0, 0x2674) #define TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x2675) +#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE TAOS_DEF_ERROR_CODE(0, 0x2676) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f8d2da0bd5..9686059052 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -102,7 +102,7 @@ typedef struct { } SRowBuildScanInfo; static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { - if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE; sinfo->numOfNone++; return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0812181c5c..8d80e3883d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -629,6 +629,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PK_OP, "primary key column can not be added, modified, and dropped") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL, "Primary key column should not be null") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE, "Primary key column should not be none") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner From b2cefa80393aeaa5fb8c6672d147223e25f6a788 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 11 Apr 2024 09:10:16 +0800 Subject: [PATCH 15/24] release state buff --- source/libs/stream/src/streamState.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1f46384448..ad6f5d48dc 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -295,6 +295,7 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa char* buf = ((SRowBuffPos*)pVal)->pRowBuff; uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; + streamStateReleaseBuf(pState, pVal, false); return code; #else return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen); @@ -332,7 +333,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) { int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) { int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal); - streamFileStateReleaseBuff(pState->pFileState, pos, false); + streamStateReleaseBuf(pState, pos, false); return code; } From 79be7eea8c1e8b3c02f1a51b71f734b94d94e674 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 10:06:07 +0800 Subject: [PATCH 16/24] fix(tsdb): fix invalid read, and do some internal refactor. --- source/common/src/tdatablock.c | 5 ++ source/dnode/vnode/src/tsdb/tsdbRead2.c | 85 +++++++++------------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 67 +++++++++++------ source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 7 +- 4 files changed, 90 insertions(+), 74 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 69b2a2e6a3..8d9ef6831d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1331,6 +1331,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) { return NULL; } + if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) { + taosMemoryFreeClear(pBlock->info.pks[0].pData); + taosMemoryFreeClear(pBlock->info.pks[1].pData); + } + blockDataFreeRes(pBlock); taosMemoryFreeClear(pBlock); return NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index c08face243..be01afb960 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -67,8 +67,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRo STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey, STsdbReader* pReader); -static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader); +static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost); static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, @@ -392,7 +391,7 @@ _err: return code; } -static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; @@ -403,8 +402,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { } } -static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); } - static void initReaderStatus(SReaderStatus* pStatus) { pStatus->pTableIter = NULL; pStatus->loadFromFile = true; @@ -657,21 +654,19 @@ _end: return code; } -static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, - SArray* pTableScanInfoList) { - size_t sizeInDisk = 0; - int64_t st = taosGetTimestampUs(); +static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, + SArray* pTableScanInfoList) { + int32_t k = 0; + size_t sizeInDisk = 0; + int64_t st = taosGetTimestampUs(); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + STimeWindow w = pReader->info.window; + SBrinRecord* pRecord = NULL; + int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); + SBrinRecordIter iter = {0}; // clear info for the new file cleanupInfoForNextFileset(pReader->status.pTableMap); - - int32_t k = 0; - int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - STimeWindow w = pReader->info.window; - SBrinRecord* pRecord = NULL; - - SBrinRecordIter iter = {0}; initBrinRecordIter(&iter, pReader->pFileReader, pIndexList); while (((pRecord = getNextBrinRecord(&iter)) != NULL)) { @@ -743,14 +738,27 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } if (pScanInfo->pBlockList == NULL) { - pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBrinRecord)); + pScanInfo->pBlockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); + if (pScanInfo->pBlockList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } - void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord); + if (pScanInfo->pBlockIdxList == NULL) { + pScanInfo->pBlockIdxList = taosArrayInit(4, sizeof(STableDataBlockIdx)); + if (pScanInfo->pBlockIdxList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + SFileDataBlockInfo blockInfo = {.tbBlockIdx = TARRAY_SIZE(pScanInfo->pBlockList)}; + recordToBlockInfo(&blockInfo, pRecord); + void* p1 = taosArrayPush(pScanInfo->pBlockList, &blockInfo); if (p1 == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + // todo: refactor to record the fileset skey/ekey if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) { pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts; } @@ -1323,10 +1331,12 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p } static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, - STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order, + STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) { - bool asc = ASCENDING_TRAVERSE(order); - if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) { + bool asc = ASCENDING_TRAVERSE(order); + int32_t step = asc ? 1 : -1; + + if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pScanInfo->pBlockIdxList) - 1) { return false; } @@ -1334,9 +1344,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return false; } - int32_t step = asc ? 1 : -1; - STableDataBlockIdx* pTableDataBlockIdx = - taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); + STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); blockInfoToRecord(pRecord, p); @@ -1344,22 +1352,6 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return true; } -static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) { - int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; - int32_t index = pBlockIter->index; - - while (index < pBlockIter->numOfBlocks && index >= 0) { - SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index); - if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) { - return index; - } - - index += step; - } - - return -1; -} - static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) { if (index < 0 || index >= pBlockIter->numOfBlocks) { @@ -2706,7 +2698,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) { - code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList); + code = loadFileBlockBrinInfo(pReader, pIndexList, pBlockNum, pTableList); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); return code; @@ -3154,23 +3146,14 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; if (pBlockInfo) { - // todo handle -// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); -// if (pScanInfo) { -// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey); -// lastKey = pScanInfo->lastProcKey; -// } - pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; } else { pDumpInfo->totalRows = 0; pDumpInfo->rowIndex = 0; -// pDumpInfo->lastKey.key.ts = lastKey; } pDumpInfo->allDumped = false; -// pDumpInfo->lastKey = lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 2a7b0140df..7e81f1df36 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -167,6 +167,13 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in return TSDB_CODE_SUCCESS; } +void clearRowKey(SRowKey* pKey) { + if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) { + return; + } + taosMemoryFree(pKey->pks[0].pData); +} + static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { int32_t numOfPks = pReader->suppInfo.numOfPks; bool asc = ASCENDING_TRAVERSE(pReader->info.order); @@ -293,6 +300,11 @@ void clearBlockScanInfo(STableBlockScanInfo* p) { p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList); p->pMemDelData = taosArrayDestroy(p->pMemDelData); p->pFileDelData = taosArrayDestroy(p->pFileDelData); + + clearRowKey(&p->lastProcKey); + clearRowKey(&p->sttRange.skey); + clearRowKey(&p->sttRange.ekey); + clearRowKey(&p->sttKeyInfo.nextProcKey); } void destroyAllBlockScanInfo(SSHashObj* pTableMap) { @@ -415,7 +427,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; } -static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) { +void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { pBlockInfo->uid = record->uid; pBlockInfo->firstKey = record->firstKey.key.ts; pBlockInfo->lastKey = record->lastKey.key.ts; @@ -449,12 +461,36 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor } } +static void freeItem(void* pItem) { + SFileDataBlockInfo* p = pItem; + if (p->firstPKLen > 0) { + taosMemoryFreeClear(p->firstPk.pData); + } + + if (p->lastPKLen > 0) { + taosMemoryFreeClear(p->lastPk.pData); + } +} + +void clearDataBlockIterator(SDataBlockIter* pIter) { + pIter->index = -1; + pIter->numOfBlocks = 0; + taosArrayClearEx(pIter->blockList, freeItem); +} + +void cleanupDataBlockIterator(SDataBlockIter* pIter) { + pIter->index = -1; + pIter->numOfBlocks = 0; + taosArrayDestroyEx(pIter->blockList, freeItem); +} + int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; + clearDataBlockIterator(pBlockIter); + pBlockIter->numOfBlocks = numOfBlocks; - taosArrayClear(pBlockIter->blockList); // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = taosArrayGetSize(pTableList); @@ -482,9 +518,9 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; for (int32_t k = 0; k < num; ++k) { - SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k); + SFileDataBlockInfo* pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k); sup.pDataBlockInfo[sup.numOfTables][k] = - (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo}; + (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo}; cnt++; } @@ -499,20 +535,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 // since there is only one table qualified, blocks are not sorted if (sup.numOfTables == 1) { STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); - if (pTableScanInfo->pBlockIdxList == NULL) { - pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx)); - } - for (int32_t i = 0; i < numOfBlocks; ++i) { - SFileDataBlockInfo blockInfo = {.tbBlockIdx = i}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); - recordToBlockInfo(&blockInfo, record, pReader); - - taosArrayPush(pBlockIter->blockList, &blockInfo); STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); } + taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList); pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); int64_t et = taosGetTimestampUs(); @@ -540,18 +568,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 int32_t pos = tMergeTreeGetChosenIndex(pTree); int32_t index = sup.indexPerTable[pos]++; - SFileDataBlockInfo blockInfo = {.tbBlockIdx = index}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); - recordToBlockInfo(&blockInfo, record, pReader); + SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); + taosArrayPush(pBlockIter->blockList, pBlockInfo); - taosArrayPush(pBlockIter->blockList, &blockInfo); STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo; - if (pTableScanInfo->pBlockIdxList == NULL) { - size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList); - pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx)); - } - STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; + STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + // set data block index overflow, in order to disable the offset comparator if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) { sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index bece22adad..0e7895c272 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -237,7 +237,6 @@ typedef struct SDataBlockIter { typedef struct SFileBlockDumpInfo { int32_t totalRows; int32_t rowIndex; -// int64_t lastKey; // STsdbRowKey lastKey; // this key should be removed bool allDumped; } SFileBlockDumpInfo; @@ -338,6 +337,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, int32_t numOfTables); +void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record); void destroyLDataIter(SLDataIter* pIter); int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); @@ -347,6 +347,11 @@ bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STab bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); +void clearRowKey(SRowKey* pKey); + +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order); +void clearDataBlockIterator(SDataBlockIter* pIter); +void cleanupDataBlockIterator(SDataBlockIter* pIter); typedef struct { SArray* pTombData; From 9f95360eccac219a8ad9a1f7a1ed05a470627d9b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 10:14:15 +0800 Subject: [PATCH 17/24] fix(tsdb): fix error in read rowkey with null. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index be01afb960..8f4c719a35 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -173,11 +173,16 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { for (int32_t i = 0; i < pRow->numOfPKs; i++) { pKey->pks[i].type = indices[i].type; + uint8_t *tdata = data + indices[i].offset; + if (pRow->flag >> 4) { + tdata += tGetI16v(tdata, NULL); + } + if (IS_VAR_DATA_TYPE(indices[i].type)) { tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); - pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData); + pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); pKey->pks[i].pData += pKey->pks[i].nData; - } else { + } else { // todo pKey->pks[i].val = *(int64_t*) (data + indices[i].offset); } } @@ -4353,9 +4358,11 @@ void tsdbReaderClose2(STsdbReader* pReader) { SReadCostSummary* pCost = &pReader->cost; SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pSttBlockReader != NULL) { - SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader; - tMergeTreeClose(&pLReader->mergeTree); - taosMemoryFree(pLReader); + SSttBlockReader* pSttBlockReader = pFilesetIter->pSttBlockReader; + tMergeTreeClose(&pSttBlockReader->mergeTree); + + clearRowKey(&pSttBlockReader->currentKey); + taosMemoryFree(pSttBlockReader); } destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); From 1f85a47cabe964d046f401612d80da502ea9ef67 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 10:47:05 +0800 Subject: [PATCH 18/24] fix(tsdb): fix memory leak. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 18 ++++++------ source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 34 ++++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 6 ++-- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 8f4c719a35..3409559867 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -182,8 +182,8 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); pKey->pks[i].pData += pKey->pks[i].nData; - } else { // todo - pKey->pks[i].val = *(int64_t*) (data + indices[i].offset); + } else { + memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); } } } @@ -396,14 +396,14 @@ _err: return code; } -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; if (pIter->blockList == NULL) { pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); } else { - taosArrayClear(pIter->blockList); + clearDataBlockIterator(pIter, hasPk); } } @@ -3183,7 +3183,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); resetTableListIndex(&pReader->status); } @@ -3293,7 +3293,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); resetTableListIndex(&pReader->status); ERetrieveType type = doReadDataFromSttFiles(pReader); @@ -4138,7 +4138,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(&pStatus->blockIter, pReader->info.order); + resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { @@ -4342,7 +4342,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFree(pSupInfo->colId); tBlockDataDestroy(&pReader->status.fileBlockData); - cleanupDataBlockIterator(&pReader->status.blockIter); + cleanupDataBlockIterator(&pReader->status.blockIter, pReader->suppInfo.numOfPks > 0); size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (pReader->status.pTableMap != NULL) { @@ -5018,7 +5018,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(pBlockIter, pReader->info.order); + resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); resetTableListIndex(&pReader->status); bool asc = ASCENDING_TRAVERSE(pReader->info.order); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 7e81f1df36..d93c8c8f79 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -461,34 +461,38 @@ void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { } } -static void freeItem(void* pItem) { +static void freePkItem(void* pItem) { SFileDataBlockInfo* p = pItem; - if (p->firstPKLen > 0) { - taosMemoryFreeClear(p->firstPk.pData); - } + taosMemoryFreeClear(p->firstPk.pData); + taosMemoryFreeClear(p->lastPk.pData); +} - if (p->lastPKLen > 0) { - taosMemoryFreeClear(p->lastPk.pData); +void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { + pIter->index = -1; + pIter->numOfBlocks = 0; + + if (hasPk) { + taosArrayClearEx(pIter->blockList, freePkItem); + } else { + taosArrayClear(pIter->blockList); } } -void clearDataBlockIterator(SDataBlockIter* pIter) { +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { pIter->index = -1; pIter->numOfBlocks = 0; - taosArrayClearEx(pIter->blockList, freeItem); -} - -void cleanupDataBlockIterator(SDataBlockIter* pIter) { - pIter->index = -1; - pIter->numOfBlocks = 0; - taosArrayDestroyEx(pIter->blockList, freeItem); + if (hasPk) { + taosArrayDestroyEx(pIter->blockList, freePkItem); + } else { + taosArrayClear(pIter->blockList); + } } int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; - clearDataBlockIterator(pBlockIter); + clearDataBlockIterator(pBlockIter, pReader->suppInfo.numOfPks > 0); pBlockIter->numOfBlocks = numOfBlocks; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 0e7895c272..94909aabf4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -349,9 +349,9 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); void clearRowKey(SRowKey* pKey); -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order); -void clearDataBlockIterator(SDataBlockIter* pIter); -void cleanupDataBlockIterator(SDataBlockIter* pIter); +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); +void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk); +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); typedef struct { SArray* pTombData; From 563efeb560a9317c927a9798fc0c8d4c9d220127 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 11:09:57 +0800 Subject: [PATCH 19/24] fix(tsdb): fix memory leak. --- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index d93c8c8f79..1fba39227c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -484,7 +484,7 @@ void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { if (hasPk) { taosArrayDestroyEx(pIter->blockList, freePkItem); } else { - taosArrayClear(pIter->blockList); + taosArrayDestroy(pIter->blockList); } } From e0f7b14ffa6b06d5da3c181358d4bcb05a83988e Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 11 Apr 2024 11:41:05 +0800 Subject: [PATCH 20/24] fix: eliminate double free --- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 1fba39227c..c82363c921 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -171,7 +171,7 @@ void clearRowKey(SRowKey* pKey) { if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) { return; } - taosMemoryFree(pKey->pks[0].pData); + taosMemoryFreeClear(pKey->pks[0].pData); } static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { From a2a237a4b0ae6c6d13f0609e55535cf8408d7fed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 14:31:48 +0800 Subject: [PATCH 21/24] fix(query): avoid process data that belongs to the next session window. --- source/libs/executor/src/timewindowoperator.c | 27 ++++++++++--------- source/libs/function/src/builtinsimpl.c | 6 ++--- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8fb8aaa69d..e62763ebc5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1339,23 +1339,24 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); } else { // start a new session window - SResultRow* pResult = NULL; + if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window + SResultRow* pResult = NULL; - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + int32_t ret = + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + } - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + // pInfo->numOfRows data belong to the current session window + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } - // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6be95b786b..6f3f5f7e5c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2657,8 +2657,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first - // function. + // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function. #if 0 if (blockDataOrder == TSDB_ORDER_ASC) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { @@ -2709,6 +2708,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #else + +// todo refactor if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) { numOfElems = 1; @@ -2790,7 +2791,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } - // SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } From 312f7864c81639f81c2f7173f230ebae7b6da660 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 14:32:17 +0800 Subject: [PATCH 22/24] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqSink.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 2b99c6f6ef..b060de029c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -622,8 +622,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat } SRow* pRow = NULL; - tqInfo("result column flag:%d", pTSchema->columns[1].flags); - code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); if (code != TSDB_CODE_SUCCESS) { tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); From a3bbf3ba0e416f8b93130d7436e125b8285058b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 15:16:21 +0800 Subject: [PATCH 23/24] fix(tsdb): check numOfPks before load pk --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 54 ++++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index a14f866bcc..9feae4c57e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -399,28 +399,31 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i); - SValue vFirst = {0}, vLast = {0}; - for (int32_t f = i; f < rows; ++f) { - int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); - if (code) { - break; + if (block.numOfPKs > 0) { + SValue vFirst = {0}, vLast = {0}; + for (int32_t f = i; f < rows; ++f) { + int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); + if (code) { + break; + } + + tValueDupPayload(&vFirst); + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); + + // todo add api to clone the original data + code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); + if (code) { + break; + } + + tValueDupPayload(&vLast); + taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); } - - tValueDupPayload(&vFirst); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); - - // todo add api to clone the original data - code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); - if (code) { - break; - } - - tValueDupPayload(&vLast); - taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); } } else { - STbStatisRecord record; + STbStatisRecord record = {0}; + while (i < rows) { tStatisBlockGet(&block, i, &record); if (record.suid != suid) { @@ -433,15 +436,18 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts); taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts); - SValue s = record.firstKey.pks[0]; - tValueDupPayload(&s); + if (record.firstKey.numOfPKs > 0) { + SValue s = record.firstKey.pks[0]; + tValueDupPayload(&s); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s); + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s); - s = record.lastKey.pks[0]; - tValueDupPayload(&s); + s = record.lastKey.pks[0]; + tValueDupPayload(&s); + + taosArrayPush(pBlockLoadInfo->info.pLastKey, &s); + } - taosArrayPush(pBlockLoadInfo->info.pLastKey, &s); i += 1; } } From eaf44ec603e87a9d4a2c342855aee929f3105047 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 15:57:04 +0800 Subject: [PATCH 24/24] fix(tsdb):set the initial size of pk in ssdatablock. --- source/libs/executor/src/executil.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 4f055cb928..a9ab8b783c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -255,6 +255,7 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) { SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i); + if (pItem->isPk) { SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId); pBlockInfo->pks[0].type = pInfoData->info.type; @@ -271,6 +272,9 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) taosMemoryFreeClear(pBlockInfo->pks[0].pData); return TSDB_CODE_OUT_OF_MEMORY; } + + pBlockInfo->pks[0].nData = pInfoData->info.bytes; + pBlockInfo->pks[1].nData = pInfoData->info.bytes; } } }