fix(stream): fix invalid read.

This commit is contained in:
Haojun Liao 2023-11-15 11:21:31 +08:00
parent 7a0dfdb0cc
commit d355b78884
2 changed files with 15 additions and 5 deletions

View File

@ -88,7 +88,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName);
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
@ -1297,7 +1297,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId);
const char *pDb = mndGetStreamDB(pMnode);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
mndTransSetDbName(pTrans, pDb, pDb);
taosMemoryFree((void *)pDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -2464,7 +2464,10 @@ static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInf
void* pIter = NULL;
while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char* pDb = (char*) pIter;
killActiveCheckpointTrans(pMnode, pDb);
size_t len = 0;
void* pKey = taosHashGetKey(pDb, &len);
killActiveCheckpointTrans(pMnode, pKey, len);
}
}
@ -2699,9 +2702,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName) {
int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here.
SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, strlen(pDBName));
SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
return TSDB_CODE_SUCCESS;
}

View File

@ -1102,6 +1102,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t vgId = pTask->pMeta->vgId;
int32_t msgId = pTask->execInfo.dispatch;
#if 0
// for test purpose, build the failure case
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) {
pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED;
}
#endif
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);