fix(stream): fix the invalid write in sma

This commit is contained in:
Haojun Liao 2023-10-24 15:24:34 +08:00
parent 0f9328330b
commit f1498f8929
4 changed files with 37 additions and 12 deletions

View File

@ -8422,7 +8422,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
} else { } else {
tDestroySVCreateTbReq(pTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE); tDestroySVCreateTbReq(pTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
} }
taosMemoryFree(pTbData->pCreateTbReq); taosMemoryFreeClear(pTbData->pCreateTbReq);
} }
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {

View File

@ -162,15 +162,19 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t len = 0; int32_t len = 0;
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
SArray *tagArray = NULL; SArray *tagArray = NULL;
SArray *pVals = NULL;
int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t numOfBlocks = taosArrayGetSize(pBlocks);
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
if (!tagArray || !pReq || !pReq->aSubmitTbData) { if (!tagArray || !pReq) {
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
if (pReq->aSubmitTbData == NULL) {
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -220,10 +224,10 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
} }
} }
} }
taosArrayPush(pReq->aSubmitTbData, &tbData);
} }
taosHashCleanup(pTableIndexMap);
// encode // encode
tEncodeSize(tEncodeSubmitReq, pReq, len, code); tEncodeSize(tEncodeSubmitReq, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -248,8 +252,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
_exit: _exit:
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosArrayDestroy(pVals); if (pReq != NULL) {
if (pReq) {
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }

View File

@ -230,6 +230,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) { if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
// todo remove this
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
ASSERT(gid == *(int64_t*)pGpIdData); ASSERT(gid == *(int64_t*)pGpIdData);
} }
@ -417,7 +418,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in
return NULL; return NULL;
} }
pCreateTbReq->ctb.tagName = createDefaultTagColName();; pCreateTbReq->ctb.tagName = createDefaultTagColName();
// set table name // set table name
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId); setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId);

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <streamsm.h>
#include "streamInt.h" #include "streamInt.h"
#include "streamsm.h" #include "streamsm.h"
#include "tmisce.h" #include "tmisce.h"
@ -243,12 +244,25 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
pSM->prev.state = pSM->current; pSM->prev.state = pSM->current;
pSM->prev.evt = pTrans->event; pSM->prev.evt = pTrans->event;
} }
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans; SStreamTask* pTask = pSM->pTask;
SStreamTask* pTask = pSM->pTask;
// do update the task status // do update the task status
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
STaskStateTrans* pTrans = pSM->pActiveTrans;
if (pTrans == NULL) {
ETaskStatus s = pSM->current.state;
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP);
// the pSM->prev.evt may be 0, so print string is not appropriate.
stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt,
pTask->id.idStr);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA;
}
keepPrevInfo(pSM); keepPrevInfo(pSM);
pSM->current = pTrans->next; pSM->current = pTrans->next;
@ -275,7 +289,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
pSM->pActiveTrans = pNextTrans; pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
int32_t code = pNextTrans->pAction(pSM->pTask); int32_t code = pNextTrans->pAction(pSM->pTask);
if (pNextTrans->autoInvokeEndFn) { if (pNextTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM); return streamTaskOnHandleEventSuccess(pSM);
@ -308,9 +322,12 @@ const char* streamTaskGetStatusStr(ETaskStatus status) {
void streamTaskResetStatus(SStreamTask* pTask) { void streamTaskResetStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM; SStreamTaskSM* pSM = pTask->status.pSM;
taosThreadMutexLock(&pTask->lock);
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->pActiveTrans = NULL; pSM->pActiveTrans = NULL;
taosArrayClear(pSM->pWaitingEventList); taosArrayClear(pSM->pWaitingEventList);
taosThreadMutexUnlock(&pTask->lock);
// clear the downstream ready status // clear the downstream ready status
pTask->status.downstreamReady = 0; pTask->status.downstreamReady = 0;
@ -323,6 +340,8 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
return; return;
} }
taosThreadMutexLock(&pTask->lock);
pSM->prev.state = pSM->current; pSM->prev.state = pSM->current;
pSM->prev.evt = 0; pSM->prev.evt = 0;
@ -330,6 +349,8 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
pSM->pActiveTrans = NULL; pSM->pActiveTrans = NULL;
taosArrayClear(pSM->pWaitingEventList); taosArrayClear(pSM->pWaitingEventList);
taosThreadMutexUnlock(&pTask->lock);
} }
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,