Merge pull request #27742 from taosdata/fix/syntax
refactor: remove void.
This commit is contained in:
commit
3540ec9d44
|
@ -636,7 +636,7 @@ typedef struct SCheckpointConsensusInfo {
|
|||
int64_t streamId;
|
||||
} SCheckpointConsensusInfo;
|
||||
|
||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
||||
void streamSetupScheduleTrigger(SStreamTask* pTask);
|
||||
|
||||
// dispatch related
|
||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
|
@ -793,6 +793,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts);
|
|||
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
||||
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
|
||||
const char* pMsg);
|
||||
void streamTmrStop(tmr_h tmrId);
|
||||
|
||||
// checkpoint
|
||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||
|
|
|
@ -171,8 +171,9 @@ int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t cap) {
|
|||
ret = snprintf(pBuf + nwrite, cap, "}, inUse:%d", pEpSet->inUse);
|
||||
if (ret <= 0 || ret >= cap) {
|
||||
return TSDB_CODE_OUT_OF_BUFFER;
|
||||
}
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
|
||||
|
|
|
@ -114,7 +114,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|||
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
|
||||
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt);
|
||||
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
|
||||
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
|
||||
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
|
||||
|
||||
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
|
||||
|
|
|
@ -63,8 +63,8 @@ static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
|
|||
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
||||
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
|
||||
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg* pReq);
|
||||
|
||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||
static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo* pInfo);
|
||||
static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo);
|
||||
|
||||
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
|
||||
static void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
|
||||
|
@ -220,7 +220,7 @@ STREAM_DECODE_OVER:
|
|||
taosMemoryFreeClear(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
char *p = (pStream == NULL) ? "null" : pStream->name;
|
||||
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr());
|
||||
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno));
|
||||
taosMemoryFreeClear(pRow);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -282,9 +282,9 @@ int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq
|
|||
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
|
||||
pCreate->targetStbFullName[0] == 0) {
|
||||
return terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
||||
return TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
||||
}
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
|
||||
|
@ -366,8 +366,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
|
||||
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
||||
if (pSourceDb == NULL) {
|
||||
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr());
|
||||
code = terrno;
|
||||
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, tstrerror(code));
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -378,8 +378,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
|
||||
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
|
||||
if (pTargetDb == NULL) {
|
||||
mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr());
|
||||
code = terrno;
|
||||
mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, tstrerror(code));
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -543,8 +543,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
code = tEncodeStreamTask(&encoder, pTask);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (code == -1) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
if (code != 0) {
|
||||
mError("failed to encode stream task, code:%s", tstrerror(code));
|
||||
taosMemoryFree(buf);
|
||||
return code;
|
||||
|
@ -669,19 +668,19 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckCreateStbReq(&createReq) != 0) {
|
||||
if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pStb = mndAcquireStb(pMnode, createReq.name);
|
||||
if (pStb != NULL) {
|
||||
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST;
|
||||
code = TSDB_CODE_MND_STB_ALREADY_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pDb = mndAcquireDbByStb(pMnode, createReq.name);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
code = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -691,7 +690,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
}
|
||||
|
||||
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
|
||||
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
||||
code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -720,7 +719,8 @@ _OVER:
|
|||
mndReleaseStb(pMnode, pStb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno));
|
||||
mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -742,16 +742,14 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
|
|||
mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
|
||||
pStreamObj->name);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||
return terrno;
|
||||
return TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||
}
|
||||
|
||||
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
||||
mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
|
||||
pStreamObj->name);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
||||
return terrno;
|
||||
return TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -767,13 +765,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
const char *pMsg = "create stream tasks on dnodes";
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
SCMCreateStreamReq createReq = {0};
|
||||
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
}
|
||||
code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
|
||||
TSDB_CHECK_CODE(code, lino, _OVER);
|
||||
|
||||
#ifdef WINDOWS
|
||||
code = TSDB_CODE_MND_INVALID_PLATFORM;
|
||||
|
@ -782,7 +778,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
|
||||
if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
|
||||
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
|
||||
mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -790,7 +786,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
if (pStream != NULL && code == 0) {
|
||||
if (createReq.igExists) {
|
||||
mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
|
||||
goto _OVER;
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
tFreeSCMCreateStreamReq(&createReq);
|
||||
return code;
|
||||
} else {
|
||||
code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
||||
goto _OVER;
|
||||
|
@ -807,21 +805,20 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
sqlLen = strlen(createReq.sql);
|
||||
sql = taosMemoryMalloc(sqlLen + 1);
|
||||
TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
|
||||
|
||||
memset(sql, 0, sqlLen + 1);
|
||||
memcpy(sql, createReq.sql, sqlLen);
|
||||
}
|
||||
|
||||
// build stream obj from request
|
||||
if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
|
||||
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
|
||||
mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if ((code = doStreamCheck(pMnode, &streamObj)) < 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
code = doStreamCheck(pMnode, &streamObj);
|
||||
TSDB_CHECK_CODE(code, lino, _OVER);
|
||||
|
||||
STrans *pTrans = NULL;
|
||||
code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
|
||||
if (pTrans == NULL || code) {
|
||||
goto _OVER;
|
||||
|
@ -830,7 +827,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
// create stb for stream
|
||||
if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
|
||||
if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
|
||||
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr());
|
||||
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -841,7 +838,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
// schedule stream task for stream obj
|
||||
code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("stream:%s, failed to schedule since %s", createReq.name, terrstr());
|
||||
mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -849,7 +846,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
// add stream to trans
|
||||
code = mndPersistStream(pTrans, &streamObj);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("stream:%s, failed to persist since %s", createReq.name, terrstr());
|
||||
mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -874,17 +871,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
// execute creation
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
if (code == 0) {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
SName dbname = {0};
|
||||
code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (code) {
|
||||
|
@ -910,9 +903,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
_OVER:
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
|
||||
mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, terrstr(code));
|
||||
} else {
|
||||
mDebug("stream:%s create stream completed", createReq.name);
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
|
@ -1058,23 +1052,24 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
|
|||
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
|
||||
int8_t mndTrigger, bool lock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
bool conflict = false;
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
||||
return code;
|
||||
}
|
||||
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
|
||||
if (conflict) {
|
||||
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
|
||||
pStream->name, pStream->uid);
|
||||
return TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
|
||||
if (code) {
|
||||
mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
|
||||
pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
|
||||
goto _ERR;
|
||||
}
|
||||
|
||||
STrans *pTrans = NULL;
|
||||
code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
|
||||
"gen checkpoint for stream", &pTrans);
|
||||
if (pTrans == NULL || code) {
|
||||
code = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
if (code) {
|
||||
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
|
||||
tstrerror(code));
|
||||
goto _ERR;
|
||||
|
@ -1126,7 +1121,9 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
|||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("failed to prepare checkpoint trans since %s", terrstr());
|
||||
mError("failed to prepare checkpoint trans since %s", terrstr(code));
|
||||
} else {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
_ERR:
|
||||
|
@ -1147,6 +1144,9 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||
bool allReady = true;
|
||||
SArray *pNodeSnapshot = NULL;
|
||||
|
||||
// check if the node update happens or not
|
||||
streamMutexLock(&execInfo.lock);
|
||||
|
||||
|
@ -1171,13 +1171,11 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
|
|||
}
|
||||
}
|
||||
|
||||
bool allReady = true;
|
||||
SArray *pNodeSnapshot = NULL;
|
||||
|
||||
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
|
||||
if (code) {
|
||||
mError("failed to get the vgroup snapshot, ignore it and continue");
|
||||
}
|
||||
|
||||
if (!allReady) {
|
||||
mWarn("not all vnodes ready, quit from vnodes status check");
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
|
@ -1185,12 +1183,16 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
|
|||
return true;
|
||||
}
|
||||
|
||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||
SVgroupChangeInfo changeInfo = {0};
|
||||
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
|
||||
if (code) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||
|
||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||
taosHashCleanup(changeInfo.pDBMap);
|
||||
mndDestroyVgroupChangeInfo(&changeInfo);
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
|
||||
if (nodeUpdated) {
|
||||
|
@ -1400,7 +1402,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
|||
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
|
||||
sdbRelease(pSdb, p);
|
||||
|
||||
if (code != -1) {
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
started += 1;
|
||||
|
||||
if (started >= capacity) {
|
||||
|
@ -1438,10 +1440,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
tFreeMDropStreamReq(&dropReq);
|
||||
return 0;
|
||||
} else {
|
||||
code = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
mError("stream:%s not exist failed to drop it", dropReq.name);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
TAOS_RETURN(code);
|
||||
TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1480,17 +1481,17 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
|
||||
if (conflict) {
|
||||
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
|
||||
if (code) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
STrans *pTrans = NULL;
|
||||
code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
|
||||
if (pTrans == NULL || code) {
|
||||
mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, terrstr());
|
||||
mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
TAOS_RETURN(code);
|
||||
|
@ -1526,7 +1527,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
|
||||
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
|
@ -1756,16 +1757,16 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
||||
if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
|
||||
if (conflict) {
|
||||
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
|
||||
if (code) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
bool updated = taskNodeIsUpdated(pMnode);
|
||||
|
@ -1821,7 +1822,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
STrans *pTrans = NULL;
|
||||
code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
|
||||
if (pTrans == NULL || code) {
|
||||
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
|
||||
mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return code;
|
||||
}
|
||||
|
@ -1836,7 +1837,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
// if nodeUpdate happened, not send pause trans
|
||||
code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
|
||||
if (code) {
|
||||
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
|
||||
mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
|
@ -1857,7 +1858,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
|
||||
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
|
@ -1874,8 +1875,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
SStreamObj *pStream = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
|
||||
return terrno;
|
||||
if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SMResumeStreamReq resumeReq = {0};
|
||||
|
@ -1906,17 +1907,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
|
||||
if (conflict) {
|
||||
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
|
||||
if (code) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
STrans *pTrans = NULL;
|
||||
code =
|
||||
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
|
||||
if (pTrans == NULL || code) {
|
||||
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
|
||||
mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return code;
|
||||
}
|
||||
|
@ -1929,8 +1930,9 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// set the resume action
|
||||
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr());
|
||||
code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
|
||||
if (code) {
|
||||
mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
|
@ -1950,7 +1952,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
taosWUnLockLatch(&pStream->lock);
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
|
||||
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
|
@ -1977,11 +1979,22 @@ static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent)
|
|||
// tasks on the will be removed replica.
|
||||
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
|
||||
// will handle it as mentioned in 1 & 2 items.
|
||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) {
|
||||
SVgroupChangeInfo info = {
|
||||
.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
|
||||
.pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
|
||||
};
|
||||
static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
|
||||
SVgroupChangeInfo *pInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pInfo == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
|
||||
pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||
|
||||
if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
|
||||
mndDestroyVgroupChangeInfo(pInfo);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
|
||||
for (int32_t i = 0; i < numOfNodes; ++i) {
|
||||
|
@ -2002,7 +2015,11 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
|||
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
|
||||
|
||||
char buf[256] = {0};
|
||||
(void) epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error
|
||||
code = epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error
|
||||
if (code) {
|
||||
mError("failed to convert epset string, code:%s", tstrerror(code));
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
}
|
||||
|
||||
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
|
||||
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
|
||||
|
@ -2011,20 +2028,16 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
|||
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
|
||||
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
|
||||
|
||||
void* p = taosArrayPush(info.pUpdateNodeList, &updateInfo);
|
||||
if (p == NULL) {
|
||||
mError("failed to put update entry into node list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
}
|
||||
void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
|
||||
TSDB_CHECK_NULL(p, code, lino, _err, terrno);
|
||||
}
|
||||
|
||||
// todo handle the snode info
|
||||
if (pCurrent->nodeId != SNODE_HANDLE) {
|
||||
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
|
||||
int32_t code = taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
||||
code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
if (code) {
|
||||
mError("failed to put into dbmap, code:out of memory");
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -2032,7 +2045,18 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
|||
}
|
||||
}
|
||||
|
||||
return info;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
mndDestroyVgroupChangeInfo(pInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) {
|
||||
if (pInfo != NULL) {
|
||||
taosArrayDestroy(pInfo->pUpdateNodeList);
|
||||
taosHashCleanup(pInfo->pDBMap);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
|
||||
|
@ -2049,13 +2073,13 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
break;
|
||||
}
|
||||
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
|
||||
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
|
||||
sdbRelease(pSdb, pStream);
|
||||
|
||||
if (conflict) {
|
||||
mError("nodeUpdate conflict with other trans, current nodeUpdate ignored");
|
||||
if (code) {
|
||||
mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2119,7 +2143,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
|
||||
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
|
@ -2276,7 +2300,11 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
goto _end;
|
||||
}
|
||||
|
||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||
SVgroupChangeInfo changeInfo = {0};
|
||||
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
|
||||
if (code) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
{
|
||||
if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
|
||||
|
@ -2310,8 +2338,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
mDebug("no update found in nodeList");
|
||||
}
|
||||
|
||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||
taosHashCleanup(changeInfo.pDBMap);
|
||||
mndDestroyVgroupChangeInfo(&changeInfo);
|
||||
|
||||
_end:
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
|
@ -2640,9 +2667,8 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
|
|||
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||
if (p == NULL) {
|
||||
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return terrno;
|
||||
return TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
} else {
|
||||
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
|
||||
req.streamId, req.taskId);
|
||||
|
@ -2993,7 +3019,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream,
|
|||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr());
|
||||
mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
|
@ -3038,9 +3064,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
|
||||
if (conflict) {
|
||||
code = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -3048,7 +3073,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
|
||||
code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
|
||||
if (pTrans == NULL || code != 0) {
|
||||
mError("failed to create trans to drop orphan tasks since %s", terrstr());
|
||||
mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -3059,7 +3084,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
|
||||
// drop all tasks
|
||||
if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
|
||||
mError("failed to create trans to drop orphan tasks since %s", terrstr());
|
||||
mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -3070,7 +3095,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
|
||||
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
|||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
|
||||
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
|
@ -109,6 +109,9 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
|||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
if (code == 0) {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -221,8 +224,8 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
|
|||
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
|
||||
} else {
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
|
||||
if (conflict) {
|
||||
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
|
||||
if (code) {
|
||||
mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
|
||||
pStream->sourceDb, pStream->targetSTbName);
|
||||
} else {
|
||||
|
|
|
@ -50,7 +50,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
pEntry->startTime);
|
||||
void* p = taosArrayPush(pList, &info);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
|
||||
|
@ -77,7 +77,6 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size,
|
||||
taosHashGetSize(execInfo.transMgmt.pDBTrans), num);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
taosArrayDestroy(pList);
|
||||
|
||||
if (pNumOfActiveChkpt != NULL) {
|
||||
|
@ -91,7 +90,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
// For a given stream:
|
||||
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
|
||||
// 2. create/drop/reset/update trans are conflict with any other trans.
|
||||
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
|
||||
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
|
||||
if (lock) {
|
||||
streamMutexLock(&execInfo.lock);
|
||||
}
|
||||
|
@ -101,7 +100,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
|
|||
if (lock) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
|
||||
|
@ -121,8 +120,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
|
|||
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
|
||||
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
|
||||
tInfo.name);
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
return true;
|
||||
return TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
} else {
|
||||
mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
|
||||
}
|
||||
|
@ -131,8 +129,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
|
|||
strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
|
||||
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
|
||||
tInfo.name);
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
return true;
|
||||
return TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
}
|
||||
} else {
|
||||
mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
|
||||
|
@ -142,7 +139,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
|
|||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
|
||||
|
@ -202,47 +199,48 @@ int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnCo
|
|||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void *buf = NULL;
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, NULL, 0);
|
||||
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
|
||||
if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
|
||||
tEncoderClear(&encoder);
|
||||
goto STREAM_ENCODE_OVER;
|
||||
TSDB_CHECK_CODE(code, lino, _over);
|
||||
}
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto STREAM_ENCODE_OVER;
|
||||
TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
|
||||
|
||||
buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) goto STREAM_ENCODE_OVER;
|
||||
TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
|
||||
|
||||
tEncoderInit(&encoder, buf, tlen);
|
||||
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
|
||||
if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
|
||||
tEncoderClear(&encoder);
|
||||
goto STREAM_ENCODE_OVER;
|
||||
TSDB_CHECK_CODE(code, lino, _over);
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, tlen, _over);
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _over);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
STREAM_ENCODE_OVER:
|
||||
_over:
|
||||
taosMemoryFreeClear(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr());
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
|
||||
sdbFreeRaw(pRaw);
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
terrno = 0;
|
||||
mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
|
||||
pStream->checkpointId);
|
||||
return pRaw;
|
||||
|
|
|
@ -1105,8 +1105,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
|||
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
|
||||
pStream->uid, pStream->name, total);
|
||||
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
|
||||
if (!conflict) {
|
||||
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
|
||||
if (code == 0) {
|
||||
code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
|
||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
|
||||
taosArrayClear(px->pTaskList);
|
||||
|
|
|
@ -752,7 +752,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
|
|||
}
|
||||
|
||||
streamTaskResetUpstreamStageInfo(pTask);
|
||||
(void)streamSetupScheduleTrigger(pTask);
|
||||
streamSetupScheduleTrigger(pTask);
|
||||
|
||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||
tqSetRestoreVersionInfo(pTask);
|
||||
|
@ -802,6 +802,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
const char* id = pTask->id.idStr;
|
||||
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
||||
SVersionRange* pStep2Range = &pTask->step2Range;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
||||
// if it's an source task, extract the last version in wal.
|
||||
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
||||
|
@ -837,12 +838,15 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
||||
(void)streamTaskSetSchedStatusInactive(pTask);
|
||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||
|
||||
// now the fill-history task starts to scan data from wal files.
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
(void)tqScanWalAsync(pTq, false);
|
||||
code = tqScanWalAsync(pTq, false);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1001,7 +1005,10 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
// let's continue scan data in the wal files
|
||||
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
|
||||
(void)tqScanWalAsync(pTq, false); // it's ok to failed
|
||||
code = tqScanWalAsync(pTq, false); // it's ok to failed
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1103,7 +1110,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
|
||||
}
|
||||
|
@ -1112,7 +1123,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
|
||||
}
|
||||
|
@ -1122,7 +1137,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
", transId:%d s-task:0x%x ignore it",
|
||||
vgId, req.checkpointId, req.transId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode
|
||||
}
|
||||
|
@ -1134,7 +1153,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
" transId:%d it may have been destroyed",
|
||||
vgId, req.taskId, req.checkpointId, req.transId);
|
||||
SRpcMsg rsp = {0};
|
||||
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1147,7 +1169,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS; // todo retry handle error
|
||||
}
|
||||
|
@ -1165,7 +1191,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1199,7 +1229,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1228,7 +1262,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SRpcMsg rsp = {0};
|
||||
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -389,7 +389,10 @@ static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
|
|||
END:
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
(void)tdbTbcClose(pCur);
|
||||
int32_t ret = tdbTbcClose(pCur);
|
||||
if (code == 0 && ret != 0) {
|
||||
code = ret;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -461,7 +464,12 @@ static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
|||
END:
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
(void)tdbTbcClose(pCur);
|
||||
|
||||
int32_t ret = tdbTbcClose(pCur);
|
||||
if (code == 0) {
|
||||
code = ret;
|
||||
}
|
||||
|
||||
tDeleteSTqCheckInfo(&info);
|
||||
return code;
|
||||
}
|
||||
|
@ -476,13 +484,13 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
|
||||
} else {
|
||||
TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
|
||||
(void)taosRemoveFile(maindb);
|
||||
TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
|
||||
}
|
||||
|
||||
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
|
||||
if(taosCheckExistFile(offsetNew)){
|
||||
TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
|
||||
(void)taosRemoveFile(offsetNew);
|
||||
TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
|
||||
}
|
||||
|
||||
TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
|
||||
|
@ -518,7 +526,7 @@ int32_t tqMetaTransform(STQ* pTq) {
|
|||
if (taosCopyFile(offset, offsetNew) < 0) {
|
||||
tqError("copy offset file error");
|
||||
} else {
|
||||
(void)taosRemoveFile(offset);
|
||||
TQ_ERR_GO_TO_END(taosRemoveFile(offset));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -527,22 +535,47 @@ END:
|
|||
taosMemoryFree(offsetNew);
|
||||
|
||||
// return 0 always, so ignore
|
||||
(void)tdbTbClose(pExecStore);
|
||||
(void)tdbTbClose(pCheckStore);
|
||||
(void)tdbClose(pMetaDB);
|
||||
int32_t ret = tdbTbClose(pExecStore);
|
||||
if (ret != 0) {
|
||||
tqError("vgId:%d failed to close stream exec store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
|
||||
ret = tdbTbClose(pCheckStore);
|
||||
if (ret != 0) {
|
||||
tqError("vgId:%d failed to close stream check store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
|
||||
ret = tdbClose(pMetaDB);
|
||||
if (ret != 0) {
|
||||
tqError("vgId:%d failed to close stream meta db store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void tqMetaClose(STQ* pTq) {
|
||||
int32_t code = 0;
|
||||
if (pTq->pExecStore) {
|
||||
(void)tdbTbClose(pTq->pExecStore);
|
||||
code = tdbTbClose(pTq->pExecStore);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to close tq exec store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
if (pTq->pCheckStore) {
|
||||
(void)tdbTbClose(pTq->pCheckStore);
|
||||
code = tdbTbClose(pTq->pCheckStore);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to close tq check store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
if (pTq->pOffsetStore) {
|
||||
(void)tdbTbClose(pTq->pOffsetStore);
|
||||
code = tdbTbClose(pTq->pOffsetStore);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to close tq offset store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
code = tdbClose(pTq->pMetaDB);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to close tq meta db store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
|
||||
}
|
||||
(void)tdbClose(pTq->pMetaDB);
|
||||
}
|
||||
|
|
|
@ -83,18 +83,27 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
|||
|
||||
_err:
|
||||
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
(void)streamTaskSnapReaderClose(pReader);
|
||||
int32_t ret = streamTaskSnapReaderClose(pReader);
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
|
||||
if (pReader == NULL) return 0;
|
||||
if (pReader == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode));
|
||||
int32_t vgId = TD_VID(pReader->pTq->pVnode);
|
||||
|
||||
taosArrayDestroy(pReader->tdbTbList);
|
||||
(void)tdbTbcClose(pReader->pCur);
|
||||
code = tdbTbcClose(pReader->pCur);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to close stream meta reader, code:%s", vgId, tstrerror(code));
|
||||
} else {
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot reader closed", vgId);
|
||||
}
|
||||
|
||||
taosMemoryFree(pReader);
|
||||
return code;
|
||||
}
|
||||
|
@ -113,6 +122,7 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
|
|||
tqDebug("vgId:%d, vnode stream-task snapshot start read data", TD_VID(pReader->pTq->pVnode));
|
||||
|
||||
STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
|
||||
|
||||
NextTbl:
|
||||
except = 0;
|
||||
for (;;) {
|
||||
|
@ -127,6 +137,7 @@ NextTbl:
|
|||
code = terrno;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
memcpy(pVal, tVal, tLen);
|
||||
vLen = tLen;
|
||||
}
|
||||
|
@ -163,8 +174,8 @@ NextTbl:
|
|||
taosMemoryFree(pVal);
|
||||
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, vnode stream-task snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode),
|
||||
tstrerror(code));
|
||||
|
|
|
@ -207,7 +207,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||
|
||||
// send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
|
||||
(void)streamTaskSendCheckpointsourceRsp(pTask);
|
||||
code = streamTaskSendCheckpointsourceRsp(pTask);
|
||||
if (code) {
|
||||
tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
streamTaskResetStatus(pTask);
|
||||
|
||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||
|
@ -806,25 +809,26 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
|
||||
int32_t type = pReq->reqType;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
|
||||
if (type == STREAM_EXEC_T_START_ONE_TASK) {
|
||||
(void)streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
return 0;
|
||||
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
|
||||
(void)streamMetaStartAllTasks(pMeta);
|
||||
code = streamMetaStartAllTasks(pMeta);
|
||||
return 0;
|
||||
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
|
||||
(void)restartStreamTasks(pMeta, isLeader);
|
||||
code = restartStreamTasks(pMeta, isLeader);
|
||||
return 0;
|
||||
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
|
||||
(void)streamMetaStopAllTasks(pMeta);
|
||||
code = streamMetaStopAllTasks(pMeta);
|
||||
return 0;
|
||||
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||
|
||||
if (pTask != NULL && (code == 0)) {
|
||||
char* pStatus = NULL;
|
||||
|
@ -846,7 +850,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
}
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||
if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed
|
||||
char* p = NULL;
|
||||
if (streamTaskReadyToRun(pTask, &p)) {
|
||||
|
@ -864,7 +868,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
|
||||
// todo add one function to handle this
|
||||
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1229,7 +1233,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
tqError(
|
||||
"vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, req.taskId);
|
||||
(void)streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||
// ignore this code to avoid error code over write
|
||||
int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -379,6 +379,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
int32_t lino = 0;
|
||||
void *px = NULL;
|
||||
int32_t startIndex = 0;
|
||||
int32_t ret = 0;
|
||||
|
||||
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
|
||||
if (numOfBlocks <= 0) {
|
||||
|
@ -487,7 +488,9 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
} else {
|
||||
STbStatisRecord record = {0};
|
||||
while (i < rows) {
|
||||
(void)tStatisBlockGet(&block, i, &record);
|
||||
code = tStatisBlockGet(&block, i, &record);
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (record.suid != suid) {
|
||||
break;
|
||||
}
|
||||
|
@ -534,7 +537,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
}
|
||||
|
||||
_end:
|
||||
(void) tStatisBlockDestroy(&block);
|
||||
ret = tStatisBlockDestroy(&block);
|
||||
if (code != 0) {
|
||||
tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
|
@ -678,7 +681,11 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
|
|||
}
|
||||
|
||||
void tLDataIterClose2(SLDataIter *pIter) {
|
||||
(void)tsdbSttFileReaderClose(&pIter->pReader); // always return 0
|
||||
int32_t code = tsdbSttFileReaderClose(&pIter->pReader); // always return 0
|
||||
if (code != 0) {
|
||||
tsdbError("%" PRId64 " failed to close tsdb file reader, code:%s", pIter->cid, tstrerror(code));
|
||||
}
|
||||
|
||||
pIter->pReader = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -449,10 +449,14 @@ static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
|
|||
|
||||
static int32_t tsdbAcquireReader(STsdbReader* pReader) {
|
||||
int32_t code = -1;
|
||||
tsdbTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
tsdbTrace("tsdb/read: %s, pre-take read mutex: %p, code: %d", pReader->idStr, &pReader->readerMutex, code);
|
||||
|
||||
code = taosThreadMutexLock(&pReader->readerMutex);
|
||||
tsdbTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
if (code != 0) {
|
||||
tsdbError("tsdb/read:%p, failed to lock reader mutex, code:%s", pReader->idStr, tstrerror(code));
|
||||
} else {
|
||||
tsdbTrace("tsdb/read: %s, post-take read mutex: %p, code: %d", pReader->idStr, &pReader->readerMutex, code);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -4574,7 +4578,10 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
|
|||
STableBlockScanInfo** p = NULL;
|
||||
int32_t iter = 0;
|
||||
|
||||
(void)tsdbAcquireReader(pReader);
|
||||
code = tsdbAcquireReader(pReader);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) {
|
||||
clearBlockScanInfo(*p);
|
||||
|
@ -4808,7 +4815,10 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
return;
|
||||
}
|
||||
|
||||
(void)tsdbAcquireReader(pReader);
|
||||
int32_t code = tsdbAcquireReader(pReader);
|
||||
if (code) {
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
|
||||
|
@ -5856,6 +5866,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
int32_t code = 0;
|
||||
STsdb* pTsdb = pReader->pTsdb;
|
||||
SVersionRange* pRange = &pReader->info.verRange;
|
||||
int32_t lino = 0;
|
||||
*ppSnap = NULL;
|
||||
|
||||
// lock
|
||||
|
@ -5869,8 +5880,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
|
||||
if (pSnap == NULL) {
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
TSDB_CHECK_NULL(pSnap, code, lino, _exit, terrno);
|
||||
}
|
||||
|
||||
// take snapshot
|
||||
|
@ -5879,14 +5889,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
|
||||
if (pSnap->pNode == NULL) {
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
TSDB_CHECK_NULL(pSnap->pNode, code, lino, _exit, terrno);
|
||||
}
|
||||
|
||||
pSnap->pNode->pQHandle = pReader;
|
||||
pSnap->pNode->reseek = reseek;
|
||||
|
||||
(void)tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
|
||||
code = tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
|
||||
|
@ -5906,7 +5916,8 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
pSnap->pINode->pQHandle = pReader;
|
||||
pSnap->pINode->reseek = reseek;
|
||||
|
||||
(void)tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
|
||||
code = tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// fs
|
||||
|
@ -5921,8 +5932,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
}
|
||||
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
goto _exit;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _exit);}
|
||||
|
||||
// unlock
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
|
@ -5932,7 +5942,8 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
return code;
|
||||
|
||||
_exit:
|
||||
tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
tsdbError("%s vgId:%d take read snapshot failed, line:%d code:%s", pReader->idStr, TD_VID(pTsdb->pVnode), lino,
|
||||
tstrerror(code));
|
||||
if (pSnap) {
|
||||
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
||||
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
|
||||
|
|
|
@ -779,8 +779,9 @@ typedef enum {
|
|||
BLK_CHECK_QUIT = 0x2,
|
||||
} ETombBlkCheckEnum;
|
||||
|
||||
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j);
|
||||
static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i,
|
||||
int32_t* j);
|
||||
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
|
||||
ETombBlkCheckEnum* pRet) {
|
||||
int32_t code = 0;
|
||||
|
@ -912,7 +913,7 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
|
|||
ETombBlkCheckEnum ret = 0;
|
||||
code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret);
|
||||
|
||||
(void)tTombBlockDestroy(&block);
|
||||
tTombBlockDestroy(&block);
|
||||
if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) {
|
||||
return code;
|
||||
}
|
||||
|
@ -994,11 +995,17 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
|
|||
|
||||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
||||
int32_t numOfTables) {
|
||||
int32_t numOfTables, int32_t* pNumOfRows) {
|
||||
int32_t num = 0;
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pNumOfRows != 0) {
|
||||
*pNumOfRows = 0;
|
||||
}
|
||||
|
||||
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
|
@ -1007,18 +1014,19 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
|||
}
|
||||
|
||||
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
SStatisBlk* p = &pStatisBlkArray->data[i];
|
||||
STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
||||
(void)tStatisBlockInit(pStatisBlock);
|
||||
TSDB_CHECK_NULL(pStatisBlock, code, lino, _err, terrno);
|
||||
|
||||
code = tStatisBlockInit(pStatisBlock);
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return 0;
|
||||
}
|
||||
code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock);
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
||||
|
@ -1030,9 +1038,10 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
|||
}
|
||||
|
||||
if (index >= pStatisBlock->numOfRecords) {
|
||||
(void)tStatisBlockDestroy(pStatisBlock);
|
||||
code = tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
return num;
|
||||
*pNumOfRows = num;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t j = index;
|
||||
|
@ -1040,9 +1049,10 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
|||
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
|
||||
p = &pStatisBlkArray->data[i];
|
||||
if (p->minTbid.suid > suid) {
|
||||
(void)tStatisBlockDestroy(pStatisBlock);
|
||||
code = tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
return num;
|
||||
*pNumOfRows = num;
|
||||
return code;
|
||||
}
|
||||
|
||||
uint64_t uid = pUidList[uidIndex];
|
||||
|
@ -1051,30 +1061,44 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
|||
num += ((int64_t*)pStatisBlock->counts.data)[j];
|
||||
uidIndex += 1;
|
||||
j += 1;
|
||||
loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
|
||||
code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
} else if (((int64_t*)pStatisBlock->uids.data)[j] < uid) {
|
||||
j += 1;
|
||||
loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
|
||||
code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
} else {
|
||||
uidIndex += 1;
|
||||
}
|
||||
}
|
||||
|
||||
(void)tStatisBlockDestroy(pStatisBlock);
|
||||
int32_t ret = tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
return num;
|
||||
*pNumOfRows = num;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("%p failed to get number of rows in stt block, %s at line:%d code:%s", pSttFileReader, __func__, lino,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
// load next stt statistics block
|
||||
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||
static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) {
|
||||
if ((*j) >= numOfRows) {
|
||||
(*i) += 1;
|
||||
(*j) = 0;
|
||||
if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
|
||||
(void)tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock);
|
||||
int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock);
|
||||
if (code != 0) {
|
||||
tsdbError("%p failed to read statisBlock, code:%s", pSttFileReader, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
|
||||
|
@ -1191,8 +1215,13 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
|||
STsdbReader* pReader = pConf->pReader;
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
uint64_t* pUidList = pReader->status.uidList.tableUidList;
|
||||
numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList,
|
||||
numOfTables);
|
||||
int32_t n = 0;
|
||||
code = getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList,
|
||||
numOfTables, &n);
|
||||
numOfRows += n;
|
||||
if (code) {
|
||||
tsdbError("%s failed to get rows in stt blocks, code:%s", pstr, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -343,7 +343,7 @@ int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
|
|||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
||||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
||||
int32_t numOfTables);
|
||||
int32_t numOfTables, int32_t* pNumOfRows);
|
||||
void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record);
|
||||
|
||||
void destroyLDataIter(SLDataIter* pIter);
|
||||
|
|
|
@ -587,7 +587,10 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
streamMetaWUnLock(pMeta);
|
||||
|
||||
tqInfo("vgId:%d stream task already loaded, start them", vgId);
|
||||
(void)streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
|
||||
int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
|
||||
if (code != 0) {
|
||||
tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,9 +153,9 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
|
|||
streamMutexUnlock(&pInfo->lock);
|
||||
|
||||
// NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
|
||||
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
|
||||
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
|
||||
pRsp->upstreamTaskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
|
@ -192,6 +192,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
|
|||
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
||||
pBlock->srcTaskId = pTask->id.taskId;
|
||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||
|
||||
if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
|
||||
stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
|
||||
return 0;
|
||||
|
@ -246,7 +247,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
|
||||
code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
(void)tmsgSendReq(&pInfo->epSet, &msg);
|
||||
code = tmsgSendReq(&pInfo->epSet, &msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -358,9 +359,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
|
||||
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||
(void)continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
|
||||
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
|
||||
} else { // only one task exists, no need to dispatch downstream info
|
||||
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId,
|
||||
code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId,
|
||||
-1);
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
}
|
||||
|
@ -372,7 +373,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
|
||||
// todo: handle this
|
||||
// update the child Id for downstream tasks
|
||||
(void) streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
|
||||
code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
|
||||
|
||||
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
|
||||
if (pActiveInfo->allUpstreamTriggerRecv != 1) {
|
||||
|
@ -384,7 +385,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
if (taskLevel == TASK_LEVEL__SINK) {
|
||||
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
(void)streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
|
||||
code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
|
||||
} else { // source & agg tasks need to forward the checkpoint msg downwards
|
||||
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
|
||||
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
|
||||
|
@ -480,7 +481,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
|
|||
} else {
|
||||
if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
|
||||
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
|
||||
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
|
||||
code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -562,7 +563,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||
streamMetaWUnLock(pMeta);
|
||||
if (pReq->dropRelHTask) {
|
||||
(void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
|
||||
id, vgId, pReq->taskId, numOfTasks);
|
||||
|
@ -639,14 +640,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
|
||||
vgId, pReq->checkpointId, terrstr());
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||
if (pReq->dropRelHTask) {
|
||||
(void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
|
||||
(int32_t)pReq->hTaskId, numOfTasks);
|
||||
|
@ -991,7 +992,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
|
||||
// do send retrieve checkpoint trigger msg to upstream
|
||||
int32_t size = taosArrayGetSize(pNotSendList);
|
||||
(void)doSendRetrieveTriggerMsg(pTask, pNotSendList);
|
||||
int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
|
||||
if (code) {
|
||||
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
|
||||
// check every 100ms
|
||||
|
@ -1262,7 +1267,11 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
|
|||
}
|
||||
}
|
||||
|
||||
(void) taosCloseDir(&pDir);
|
||||
int32_t ret = taosCloseDir(&pDir);
|
||||
if (code == 0 && ret != 0) {
|
||||
code = ret;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1255,7 +1255,11 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
|
|||
.recvTs = taosGetTimestampMs(), .transId = pReq->transId, .checkpointId = pReq->checkpointId};
|
||||
|
||||
// todo retry until it success
|
||||
(void)streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
|
||||
int32_t code = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
streamMutexLock(&pActiveInfo->lock);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
static void streamTaskResumeHelper(void* param, void* tmrId);
|
||||
static void streamTaskSchedHelper(void* param, void* tmrId);
|
||||
|
||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||
void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
|
||||
|
@ -29,8 +29,6 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
|
||||
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTrySchedExec(SStreamTask* pTask) {
|
||||
|
@ -47,7 +45,7 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
|
|||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||
if (pRunReq == NULL) {
|
||||
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
|
||||
terrstr());
|
||||
tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -94,7 +92,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
|
|||
SStreamTaskState p = streamTaskGetStatus(pTask);
|
||||
|
||||
if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
|
||||
(void) streamTaskSetSchedStatusInactive(pTask);
|
||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref);
|
||||
|
|
|
@ -24,8 +24,8 @@
|
|||
#include "wal.h"
|
||||
|
||||
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
||||
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
|
||||
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
|
||||
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
|
||||
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
|
||||
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
|
||||
|
||||
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
||||
|
@ -36,15 +36,25 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
|
||||
int32_t code = 0;
|
||||
char buf[512] = {0};
|
||||
|
||||
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
||||
bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
|
||||
(void)epsetToStr(pEpSet, buf, tListLen(buf));
|
||||
code = epsetToStr(pEpSet, buf, tListLen(buf));
|
||||
if (code) { // print error and continue
|
||||
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!isEqual) {
|
||||
(*pUpdated) = true;
|
||||
char tmp[512] = {0};
|
||||
(void)epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors
|
||||
code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors
|
||||
if (code) { // print error and continue
|
||||
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
epsetAssign(&pTask->info.epSet, pEpSet);
|
||||
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
|
||||
|
@ -56,15 +66,15 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
|
|||
// check for the dispatch info and the upstream task info
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
|
||||
code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
|
||||
} else if (level == TASK_LEVEL__AGG) {
|
||||
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
|
||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
|
||||
code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
|
||||
code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
|
||||
} else { // TASK_LEVEL__SINK
|
||||
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
|
||||
code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void freeItem(void* p) {
|
||||
|
@ -227,17 +237,17 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->schedInfo.pDelayTimer != NULL) {
|
||||
(void)taosTmrStop(pTask->schedInfo.pDelayTimer);
|
||||
streamTmrStop(pTask->schedInfo.pDelayTimer);
|
||||
pTask->schedInfo.pDelayTimer = NULL;
|
||||
}
|
||||
|
||||
if (pTask->hTaskInfo.pTimer != NULL) {
|
||||
(void)taosTmrStop(pTask->hTaskInfo.pTimer);
|
||||
streamTmrStop(pTask->hTaskInfo.pTimer);
|
||||
pTask->hTaskInfo.pTimer = NULL;
|
||||
}
|
||||
|
||||
if (pTask->msgInfo.pRetryTmr != NULL) {
|
||||
(void)taosTmrStop(pTask->msgInfo.pRetryTmr);
|
||||
streamTmrStop(pTask->msgInfo.pRetryTmr);
|
||||
pTask->msgInfo.pRetryTmr = NULL;
|
||||
}
|
||||
|
||||
|
@ -402,8 +412,14 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
|
||||
(void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
|
||||
int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
|
||||
if (code) {
|
||||
stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
pTask->refCnt = 1;
|
||||
|
||||
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
||||
|
@ -419,7 +435,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->status.timerActive = 0;
|
||||
|
||||
int32_t code = streamCreateStateMachine(pTask);
|
||||
code = streamCreateStateMachine(pTask);
|
||||
if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
|
||||
tstrerror(code));
|
||||
|
@ -439,6 +455,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
|
||||
code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
|
||||
if (code) {
|
||||
stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -484,16 +501,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
|
||||
pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
||||
if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
|
||||
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr,
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
|
||||
if (pTask->taskCheckInfo.pList == NULL) {
|
||||
stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr,
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pTask->chkInfo.pActiveInfo == NULL) {
|
||||
|
@ -539,9 +554,14 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
|
|||
return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
|
||||
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
|
||||
int32_t code = 0;
|
||||
char buf[512] = {0};
|
||||
(void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file.
|
||||
code = epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file.
|
||||
if (code != 0) { // print error and continue
|
||||
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
||||
|
@ -552,7 +572,11 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
|
|||
*pUpdated = true;
|
||||
|
||||
char tmp[512] = {0};
|
||||
(void)epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
|
||||
code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
|
||||
if (code != 0) { // print error and continue
|
||||
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
epsetAssign(&pInfo->epSet, pEpSet);
|
||||
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
|
||||
|
@ -565,6 +589,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
|
||||
|
@ -585,9 +611,13 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
|
|||
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
|
||||
}
|
||||
|
||||
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
|
||||
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
|
||||
char buf[512] = {0};
|
||||
(void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files.
|
||||
int32_t code = epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files.
|
||||
if (code != 0) { // print error and continue
|
||||
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t id = pTask->id.taskId;
|
||||
int8_t type = pTask->outputInfo.type;
|
||||
|
@ -605,8 +635,13 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
|||
bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
|
||||
if (!isEqual) {
|
||||
*pUpdated = true;
|
||||
|
||||
char tmp[512] = {0};
|
||||
(void)epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
|
||||
code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
|
||||
if (code != 0) { // print error and continue
|
||||
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
epsetAssign(&pVgInfo->epSet, pEpSet);
|
||||
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
|
||||
|
@ -626,7 +661,11 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
|||
*pUpdated = true;
|
||||
|
||||
char tmp[512] = {0};
|
||||
(void)epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
|
||||
code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
|
||||
if (code != 0) { // print error and continue
|
||||
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
epsetAssign(&pDispatcher->epSet, pEpSet);
|
||||
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
|
||||
|
@ -637,6 +676,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskStop(SStreamTask* pTask) {
|
||||
|
@ -977,7 +1018,10 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
|||
}
|
||||
|
||||
void streamTaskPause(SStreamTask* pTask) {
|
||||
(void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
|
||||
int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
|
||||
if (code) {
|
||||
stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamTaskResume(SStreamTask* pTask) {
|
||||
|
@ -1182,13 +1226,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
|
|||
|
||||
SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
|
||||
if (pTriggerTmr->tmrHandle != NULL) {
|
||||
(void)taosTmrStop(pTriggerTmr->tmrHandle);
|
||||
streamTmrStop(pTriggerTmr->tmrHandle);
|
||||
pTriggerTmr->tmrHandle = NULL;
|
||||
}
|
||||
|
||||
SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
|
||||
if (pReadyTmr->tmrHandle != NULL) {
|
||||
(void)taosTmrStop(pReadyTmr->tmrHandle);
|
||||
streamTmrStop(pReadyTmr->tmrHandle);
|
||||
pReadyTmr->tmrHandle = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -42,13 +42,16 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) {
|
|||
|
||||
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
|
||||
const char* pMsg) {
|
||||
// while (1) {
|
||||
bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId);
|
||||
if (ret) {
|
||||
// break;
|
||||
}
|
||||
// stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg);
|
||||
// }
|
||||
}
|
||||
|
||||
void streamTmrStop(tmr_h tmrId) {
|
||||
bool stop = taosTmrStop(tmrId);
|
||||
if (stop) {
|
||||
// todo
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) {
|
||||
|
|
Loading…
Reference in New Issue