diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index bb666eb6dd..941e7e53a2 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1377,8 +1377,8 @@ int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, cons return TSDB_CODE_INVALID_PARA; } - pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), - pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)); + pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { mndDestroyVgroupChangeInfo(pInfo); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 968744a0c5..0fc007a1fd 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1872,6 +1872,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } } +#if 0 + // inject errors, and always refuse the upstream dispatch msg and trigger the task nodeEpset update trans. + status = TASK_INPUT_STATUS__REFUSED; +#endif + { // do send response with the input status int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);