From 0c7373509e68cf6d5c6af629f27dae4981bee3ec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 18:07:00 +0800 Subject: [PATCH 1/5] fix(stream): add task update trans conflict level. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 3 ++- source/dnode/vnode/src/tqCommon/tqCommon.c | 9 ++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 16b735fbc4..74ad09c752 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -98,7 +98,8 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName); } } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || - (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) { + (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) || + strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); terrno = TSDB_CODE_MND_TRANS_CONFLICT; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2352d3a555..8b2e9693eb 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -195,12 +195,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM const char* idstr = pTask->id.idStr; if (pMeta->updateInfo.transId != req.transId) { - pMeta->updateInfo.transId = req.transId; - tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", idstr, req.transId); + ASSERT(req.transId > pMeta->updateInfo.transId); + tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, + vgId, req.transId, pMeta->updateInfo.transId); + // info needs to be kept till the new trans to update the nodeEp arrived. taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = req.transId; } else { - tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId); + tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); } // duplicate update epset msg received, discard this redundant message From 0f8ff48c6ba3edacd119dc363625c69a719f244a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 18:18:01 +0800 Subject: [PATCH 2/5] fix(stream): fix double free --- source/common/src/tmsg.c | 2 ++ source/dnode/vnode/src/tq/tqSink.c | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 340463e48c..6c2358a1d7 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9624,6 +9624,8 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { taosArrayDestroy(pTbData->aRowP); } } + + pTbData->aRowP = NULL; } void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 1da096224c..f690b9b277 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -583,7 +583,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat if (IS_SET_NULL(pCol)) { if (pCol->flags & COL_IS_KEY) { - qError("ts:%" PRId64 " Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts, + qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts, pCol->colId, pCol->type); break; } @@ -593,7 +593,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); if (colDataIsNull_s(pColData, j)) { if (pCol->flags & COL_IS_KEY) { - qError("ts:%" PRId64 "Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, + qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts, pCol->colId, pCol->type); break; } @@ -624,8 +624,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); if (code != TSDB_CODE_SUCCESS) { tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); + tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts); return code; } From ace5e12e65cd060f9bd9133c13f2b845e6723453 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 19:25:51 +0800 Subject: [PATCH 3/5] fix(tsdb): set the correct lflag when creating tsdbCache reader. --- include/libs/executor/storageapi.h | 1 + source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 6 +++++- source/libs/executor/src/cachescanoperator.c | 3 +++ 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 0cefa0c476..cba32bec04 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -35,6 +35,7 @@ extern "C" { #define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2 #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 +#define CACHESCAN_RETRIEVE_PK 0x10 #define META_READER_LOCK 0x0 #define META_READER_NOLOCK 0x1 diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 76f98b33c1..fd8b73b1f0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -130,7 +130,7 @@ static void tsdbClosePgCache(STsdb *pTsdb) { enum { LFLAG_LAST_ROW = 0, LFLAG_LAST = 1, - LFLAG_PRIMARY_KEY = (1 << 4), + LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK, }; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 1d1009c15f..42b8365130 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -386,7 +386,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; + int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; + if (pr->rowKey.numOfPKs > 0) { + ltype |= CACHESCAN_RETRIEVE_PK; + } + STableKeyInfo* pTableList = pr->pTableList; // retrieve the only one last row of all tables in the uid list. diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index b7159225e1..985cdb9433 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -160,6 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe // partition by tbname if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); + if (pInfo->numOfPks > 0) { + pInfo->retrieveType |= CACHESCAN_RETRIEVE_PK; + } STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); From 82cde4661465de71b2071f44599a0be8ac4c56ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 19:48:50 +0800 Subject: [PATCH 4/5] fix(stream): fix memory leak. --- source/libs/stream/src/streamCheckpoint.c | 1 + source/libs/stream/src/streamExec.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 67b68f73ad..36886329ac 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -190,6 +190,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); + streamFreeQitem((SStreamQueueItem*)pBlock); return code; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 93ede2707b..ab69a135f1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -46,6 +46,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); code = streamTaskPutDataIntoOutputQ(pTask, pBlock); if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock(pBlock); return code; } @@ -76,7 +77,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // back pressure and record position - destroyStreamDataBlock(pStreamBlocks); return code; } From a37667968fbf3e1d8f99163adf3d1691b4572bcf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 19:49:14 +0800 Subject: [PATCH 5/5] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 32 +++++++++-------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ab69a135f1..891e0aa142 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,6 +21,7 @@ #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms #define MIN_INVOKE_INTERVAL 50 // 50ms +#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); @@ -244,6 +245,10 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* } } +static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) { + return (SScanhistoryDataInfo){code, idleTime}; +} + SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -260,7 +265,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { if (streamTaskShouldPause(pTask)) { stDebug("s-task:%s paused from the scan-history task", id); // quit from step1, not continue to handle the step2 - return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -275,7 +280,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { if(streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } // dispatch the generated results @@ -285,38 +290,21 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { // downstream task input queue is full, try in 5sec if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { - return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); } if (finished) { - return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0); } if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, pTask->info.fillHistory, el / 1000.0); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100); } } } -// wait for the stream task to be idle -static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { - const char* id = pTask->id.idStr; - - int64_t st = taosGetTimestampMs(); - while (!streamTaskIsIdle(pStreamTask)) { - stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel, - pStreamTask->id.idStr); - taosMsleep(100); - } - - double el = (taosGetTimestampMs() - st) / 1000.0; - if (el > 0) { - stDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", id, pStreamTask->id.idStr, el); - } -} - int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; const char* id = pTask->id.idStr;