From f1498f89294539010ec1d56975f21c9a87e2dcd3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Oct 2023 15:24:34 +0800 Subject: [PATCH] fix(stream): fix the invalid write in sma --- source/common/src/tmsg.c | 2 +- source/dnode/vnode/src/sma/smaTimeRange.c | 17 ++++++++------ source/dnode/vnode/src/tq/tqSink.c | 3 ++- source/libs/stream/src/streamTaskSm.c | 27 ++++++++++++++++++++--- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9b66bd1fb3..c67b9e5e68 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8422,7 +8422,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { } else { tDestroySVCreateTbReq(pTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE); } - taosMemoryFree(pTbData->pCreateTbReq); + taosMemoryFreeClear(pTbData->pCreateTbReq); } if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 2395a7cfb9..08ddc4bd7b 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -162,15 +162,19 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t len = 0; SSubmitReq2 *pReq = NULL; SArray *tagArray = NULL; - SArray *pVals = NULL; int32_t numOfBlocks = taosArrayGetSize(pBlocks); tagArray = taosArrayInit(1, sizeof(STagVal)); pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); - pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); - if (!tagArray || !pReq || !pReq->aSubmitTbData) { + if (!tagArray || !pReq) { + code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); + if (pReq->aSubmitTbData == NULL) { code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; TSDB_CHECK_CODE(code, lino, _exit); } @@ -220,10 +224,10 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * } } } - - taosArrayPush(pReq->aSubmitTbData, &tbData); } + taosHashCleanup(pTableIndexMap); + // encode tEncodeSize(tEncodeSubmitReq, pReq, len, code); if (TSDB_CODE_SUCCESS == code) { @@ -248,8 +252,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * _exit: taosArrayDestroy(tagArray); - taosArrayDestroy(pVals); - if (pReq) { + if (pReq != NULL) { tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 97e3376663..742b170a8c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -230,6 +230,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) { SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); + // todo remove this void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); ASSERT(gid == *(int64_t*)pGpIdData); } @@ -417,7 +418,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in return NULL; } - pCreateTbReq->ctb.tagName = createDefaultTagColName();; + pCreateTbReq->ctb.tagName = createDefaultTagColName(); // set table name setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c3286407e4..bc832c178c 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "streamInt.h" #include "streamsm.h" #include "tmisce.h" @@ -243,12 +244,25 @@ static void keepPrevInfo(SStreamTaskSM* pSM) { pSM->prev.state = pSM->current; pSM->prev.evt = pTrans->event; } + int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { - STaskStateTrans* pTrans = pSM->pActiveTrans; - SStreamTask* pTask = pSM->pTask; + SStreamTask* pTask = pSM->pTask; // do update the task status taosThreadMutexLock(&pTask->lock); + STaskStateTrans* pTrans = pSM->pActiveTrans; + + if (pTrans == NULL) { + ETaskStatus s = pSM->current.state; + ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP); + // the pSM->prev.evt may be 0, so print string is not appropriate. + stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt, + pTask->id.idStr); + + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_INVALID_PARA; + } + keepPrevInfo(pSM); pSM->current = pTrans->next; @@ -275,7 +289,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { pSM->pActiveTrans = pNextTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); - + int32_t code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { return streamTaskOnHandleEventSuccess(pSM); @@ -308,9 +322,12 @@ const char* streamTaskGetStatusStr(ETaskStatus status) { void streamTaskResetStatus(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; + + taosThreadMutexLock(&pTask->lock); pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; pSM->pActiveTrans = NULL; taosArrayClear(pSM->pWaitingEventList); + taosThreadMutexUnlock(&pTask->lock); // clear the downstream ready status pTask->status.downstreamReady = 0; @@ -323,6 +340,8 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { return; } + taosThreadMutexLock(&pTask->lock); + pSM->prev.state = pSM->current; pSM->prev.evt = 0; @@ -330,6 +349,8 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { pSM->startTs = taosGetTimestampMs(); pSM->pActiveTrans = NULL; taosArrayClear(pSM->pWaitingEventList); + + taosThreadMutexUnlock(&pTask->lock); } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,