diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c
index 9b66bd1fb3..c67b9e5e68 100644
--- a/source/common/src/tmsg.c
+++ b/source/common/src/tmsg.c
@@ -8422,7 +8422,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
} else {
tDestroySVCreateTbReq(pTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
}
- taosMemoryFree(pTbData->pCreateTbReq);
+ taosMemoryFreeClear(pTbData->pCreateTbReq);
}
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c
index 2395a7cfb9..08ddc4bd7b 100644
--- a/source/dnode/vnode/src/sma/smaTimeRange.c
+++ b/source/dnode/vnode/src/sma/smaTimeRange.c
@@ -162,15 +162,19 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t len = 0;
SSubmitReq2 *pReq = NULL;
SArray *tagArray = NULL;
- SArray *pVals = NULL;
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
tagArray = taosArrayInit(1, sizeof(STagVal));
pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
- pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
- if (!tagArray || !pReq || !pReq->aSubmitTbData) {
+ if (!tagArray || !pReq) {
+ code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno;
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
+ pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
+ if (pReq->aSubmitTbData == NULL) {
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
@@ -220,10 +224,10 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
}
}
}
-
- taosArrayPush(pReq->aSubmitTbData, &tbData);
}
+ taosHashCleanup(pTableIndexMap);
+
// encode
tEncodeSize(tEncodeSubmitReq, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) {
@@ -248,8 +252,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
_exit:
taosArrayDestroy(tagArray);
- taosArrayDestroy(pVals);
- if (pReq) {
+ if (pReq != NULL) {
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index 97e3376663..742b170a8c 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -230,6 +230,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
+ // todo remove this
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
ASSERT(gid == *(int64_t*)pGpIdData);
}
@@ -417,7 +418,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in
return NULL;
}
- pCreateTbReq->ctb.tagName = createDefaultTagColName();;
+ pCreateTbReq->ctb.tagName = createDefaultTagColName();
// set table name
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId);
diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c
index c3286407e4..bc832c178c 100644
--- a/source/libs/stream/src/streamTaskSm.c
+++ b/source/libs/stream/src/streamTaskSm.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include
#include "streamInt.h"
#include "streamsm.h"
#include "tmisce.h"
@@ -243,12 +244,25 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
pSM->prev.state = pSM->current;
pSM->prev.evt = pTrans->event;
}
+
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
- STaskStateTrans* pTrans = pSM->pActiveTrans;
- SStreamTask* pTask = pSM->pTask;
+ SStreamTask* pTask = pSM->pTask;
// do update the task status
taosThreadMutexLock(&pTask->lock);
+ STaskStateTrans* pTrans = pSM->pActiveTrans;
+
+ if (pTrans == NULL) {
+ ETaskStatus s = pSM->current.state;
+ ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP);
+ // the pSM->prev.evt may be 0, so print string is not appropriate.
+ stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt,
+ pTask->id.idStr);
+
+ taosThreadMutexUnlock(&pTask->lock);
+ return TSDB_CODE_INVALID_PARA;
+ }
+
keepPrevInfo(pSM);
pSM->current = pTrans->next;
@@ -275,7 +289,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
-
+
int32_t code = pNextTrans->pAction(pSM->pTask);
if (pNextTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM);
@@ -308,9 +322,12 @@ const char* streamTaskGetStatusStr(ETaskStatus status) {
void streamTaskResetStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
+
+ taosThreadMutexLock(&pTask->lock);
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->pWaitingEventList);
+ taosThreadMutexUnlock(&pTask->lock);
// clear the downstream ready status
pTask->status.downstreamReady = 0;
@@ -323,6 +340,8 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
return;
}
+ taosThreadMutexLock(&pTask->lock);
+
pSM->prev.state = pSM->current;
pSM->prev.evt = 0;
@@ -330,6 +349,8 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
pSM->startTs = taosGetTimestampMs();
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->pWaitingEventList);
+
+ taosThreadMutexUnlock(&pTask->lock);
}
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,