refactor: remove void and do some internal refactor.

This commit is contained in:
Haojun Liao 2024-09-09 14:49:05 +08:00
parent 979c6e20e6
commit a29903101a
15 changed files with 245 additions and 107 deletions

View File

@ -636,7 +636,7 @@ typedef struct SCheckpointConsensusInfo {
int64_t streamId;
} SCheckpointConsensusInfo;
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
void streamSetupScheduleTrigger(SStreamTask* pTask);
// dispatch related
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
@ -793,6 +793,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts);
int32_t streamTimerGetInstance(tmr_h* pTmr);
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg);
void streamTmrStop(tmr_h tmrId);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -171,8 +171,9 @@ int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t cap) {
ret = snprintf(pBuf + nwrite, cap, "}, inUse:%d", pEpSet->inUse);
if (ret <= 0 || ret >= cap) {
return TSDB_CODE_OUT_OF_BUFFER;
} else {
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {

View File

@ -19,8 +19,8 @@
#include "systable.h"
#include "mndUser.h"
#define SHOW_STEP_SIZE 100
#define SHOW_COLS_STEP_SIZE 4096
#define SHOW_STEP_SIZE 100
#define SHOW_COLS_STEP_SIZE 4096
#define SHOW_PRIVILEGES_STEP_SIZE 2048
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq);

View File

@ -1052,6 +1052,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int32_t code = TSDB_CODE_SUCCESS;
bool conflict = false;
int64_t ts = taosGetTimestampMs();
STrans *pTrans = NULL;
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
return code;
@ -1059,20 +1060,14 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
if (code) {
mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64,
pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid);
goto _ERR;
}
if (conflict) {
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
pStream->name, pStream->uid);
return TSDB_CODE_MND_TRANS_CONFLICT;
}
STrans *pTrans = NULL;
code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
"gen checkpoint for stream", &pTrans);
if (pTrans == NULL || code) {
code = TSDB_CODE_MND_TRANS_CONFLICT;
"gen checkpoint for stream", &pTrans);
if (code) {
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(code));
goto _ERR;

View File

@ -100,7 +100,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;

View File

@ -50,7 +50,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
pEntry->startTime);
void* p = taosArrayPush(pList, &info);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
} else {
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
@ -77,7 +77,6 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size,
taosHashGetSize(execInfo.transMgmt.pDBTrans), num);
terrno = TSDB_CODE_SUCCESS;
taosArrayDestroy(pList);
if (pNumOfActiveChkpt != NULL) {

View File

@ -752,7 +752,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
}
streamTaskResetUpstreamStageInfo(pTask);
(void)streamSetupScheduleTrigger(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask);
@ -802,6 +802,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
const char* id = pTask->id.idStr;
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
SVersionRange* pStep2Range = &pTask->step2Range;
int32_t vgId = pTask->pMeta->vgId;
// if it's an source task, extract the last version in wal.
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
@ -837,12 +838,15 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
(void)streamTaskSetSchedStatusInactive(pTask);
int8_t status = streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files.
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) {
(void)tqScanWalAsync(pTq, false);
code = tqScanWalAsync(pTq, false);
if (code) {
tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
}
}
}
}
@ -1001,7 +1005,10 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// let's continue scan data in the wal files
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
(void)tqScanWalAsync(pTq, false); // it's ok to failed
code = tqScanWalAsync(pTq, false); // it's ok to failed
if (code) {
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
}
}
return code;
@ -1103,7 +1110,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0};
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
}
@ -1112,7 +1123,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0};
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
}
@ -1122,7 +1137,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0};
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode
}
@ -1134,7 +1153,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0};
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1147,7 +1169,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // todo retry handle error
}
@ -1165,7 +1191,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1199,7 +1229,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
@ -1228,7 +1262,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0};
(void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}

View File

@ -374,7 +374,10 @@ static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
END:
tdbFree(pKey);
tdbFree(pVal);
(void)tdbTbcClose(pCur);
int32_t ret = tdbTbcClose(pCur);
if (code == 0 && ret != 0) {
code = ret;
}
return code;
}
@ -446,7 +449,12 @@ static int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
END:
tdbFree(pKey);
tdbFree(pVal);
(void)tdbTbcClose(pCur);
int32_t ret = tdbTbcClose(pCur);
if (code == 0) {
code = ret;
}
tDeleteSTqCheckInfo(&info);
return code;
}
@ -461,13 +469,13 @@ int32_t tqMetaOpen(STQ* pTq) {
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
} else {
TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
(void)taosRemoveFile(maindb);
TQ_ERR_GO_TO_END(taosRemoveFile(maindb));
}
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offsetNew)){
TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
(void)taosRemoveFile(offsetNew);
TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
}
TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
@ -503,7 +511,7 @@ int32_t tqMetaTransform(STQ* pTq) {
if (taosCopyFile(offset, offsetNew) < 0) {
tqError("copy offset file error");
} else {
(void)taosRemoveFile(offset);
TQ_ERR_GO_TO_END(taosRemoveFile(offset));
}
}
@ -512,22 +520,47 @@ END:
taosMemoryFree(offsetNew);
// return 0 always, so ignore
(void)tdbTbClose(pExecStore);
(void)tdbTbClose(pCheckStore);
(void)tdbClose(pMetaDB);
int32_t ret = tdbTbClose(pExecStore);
if (ret != 0) {
tqError("vgId:%d failed to close stream exec store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
}
ret = tdbTbClose(pCheckStore);
if (ret != 0) {
tqError("vgId:%d failed to close stream check store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
}
ret = tdbClose(pMetaDB);
if (ret != 0) {
tqError("vgId:%d failed to close stream meta db store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret));
}
return code;
}
void tqMetaClose(STQ* pTq) {
int32_t code = 0;
if (pTq->pExecStore) {
(void)tdbTbClose(pTq->pExecStore);
code = tdbTbClose(pTq->pExecStore);
if (code) {
tqError("vgId:%d failed to close tq exec store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
}
}
if (pTq->pCheckStore) {
(void)tdbTbClose(pTq->pCheckStore);
code = tdbTbClose(pTq->pCheckStore);
if (code) {
tqError("vgId:%d failed to close tq check store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
}
}
if (pTq->pOffsetStore) {
(void)tdbTbClose(pTq->pOffsetStore);
code = tdbTbClose(pTq->pOffsetStore);
if (code) {
tqError("vgId:%d failed to close tq offset store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
}
}
code = tdbClose(pTq->pMetaDB);
if (code) {
tqError("vgId:%d failed to close tq meta db store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
}
(void)tdbClose(pTq->pMetaDB);
}

View File

@ -83,18 +83,27 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
_err:
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
(void)streamTaskSnapReaderClose(pReader);
int32_t ret = streamTaskSnapReaderClose(pReader);
*ppReader = NULL;
return code;
}
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
if (pReader == NULL) return 0;
if (pReader == NULL) {
return 0;
}
int32_t code = 0;
tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode));
int32_t vgId = TD_VID(pReader->pTq->pVnode);
taosArrayDestroy(pReader->tdbTbList);
(void)tdbTbcClose(pReader->pCur);
code = tdbTbcClose(pReader->pCur);
if (code) {
tqError("vgId:%d failed to close stream meta reader, code:%s", vgId, tstrerror(code));
} else {
tqInfo("vgId:%d, vnode stream-task snapshot reader closed", vgId);
}
taosMemoryFree(pReader);
return code;
}
@ -113,6 +122,7 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
tqDebug("vgId:%d, vnode stream-task snapshot start read data", TD_VID(pReader->pTq->pVnode));
STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
NextTbl:
except = 0;
for (;;) {
@ -127,6 +137,7 @@ NextTbl:
code = terrno;
goto _err;
}
memcpy(pVal, tVal, tLen);
vLen = tLen;
}
@ -163,8 +174,8 @@ NextTbl:
taosMemoryFree(pVal);
tqDebug("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
return code;
_err:
tqError("vgId:%d, vnode stream-task snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode),
tstrerror(code));

View File

@ -587,7 +587,10 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d stream task already loaded, start them", vgId);
(void)streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
if (code != 0) {
tqError("vgId:%^d failed to sched stream task, code:%s", vgId, tstrerror(code));
}
return;
}
}

View File

@ -153,9 +153,9 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
streamMutexUnlock(&pInfo->lock);
// NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
pRsp->upstreamTaskId);
return TSDB_CODE_SUCCESS;
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
pRsp->upstreamTaskId);
return code;
}
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
@ -192,6 +192,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
return 0;
@ -246,7 +247,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
if (code == TSDB_CODE_SUCCESS) {
(void)tmsgSendReq(&pInfo->epSet, &msg);
code = tmsgSendReq(&pInfo->epSet, &msg);
}
}
@ -358,9 +359,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
(void)continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
} else { // only one task exists, no need to dispatch downstream info
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId,
code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId,
-1);
streamFreeQitem((SStreamQueueItem*)pBlock);
}
@ -372,7 +373,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// todo: handle this
// update the child Id for downstream tasks
(void) streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
if (pActiveInfo->allUpstreamTriggerRecv != 1) {
@ -384,7 +385,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (taskLevel == TASK_LEVEL__SINK) {
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
streamFreeQitem((SStreamQueueItem*)pBlock);
(void)streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
} else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
@ -480,7 +481,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
} else {
if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
}
}
@ -562,7 +563,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
// drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) {
(void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
@ -639,14 +640,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
vgId, pReq->checkpointId, terrstr());
return code;
return TSDB_CODE_SUCCESS;
}
streamMetaWUnLock(pMeta);
// drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) {
(void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
(int32_t)pReq->hTaskId, numOfTasks);
@ -991,7 +992,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// do send retrieve checkpoint trigger msg to upstream
int32_t size = taosArrayGetSize(pNotSendList);
(void)doSendRetrieveTriggerMsg(pTask, pNotSendList);
int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
if (code) {
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, tstrerror(code));
}
streamMutexUnlock(&pActiveInfo->lock);
// check every 100ms
@ -1262,7 +1267,11 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
}
}
(void) taosCloseDir(&pDir);
int32_t ret = taosCloseDir(&pDir);
if (code == 0 && ret != 0) {
code = ret;
}
return code;
}

View File

@ -1255,7 +1255,11 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
.recvTs = taosGetTimestampMs(), .transId = pReq->transId, .checkpointId = pReq->checkpointId};
// todo retry until it success
(void)streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
int32_t code = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
if (code) {
stError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pActiveInfo->lock);

View File

@ -19,7 +19,7 @@
static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
void streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
@ -29,8 +29,6 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
return 0;
}
int32_t streamTrySchedExec(SStreamTask* pTask) {
@ -47,7 +45,7 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
terrstr());
tstrerror(terrno));
return terrno;
}
@ -94,7 +92,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
SStreamTaskState p = streamTaskGetStatus(pTask);
if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
(void) streamTaskSetSchedStatusInactive(pTask);
int8_t status = streamTaskSetSchedStatusInactive(pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref);

View File

@ -24,8 +24,8 @@
#include "wal.h"
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
@ -36,15 +36,25 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
}
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
int32_t code = 0;
char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
(void)epsetToStr(pEpSet, buf, tListLen(buf));
code = epsetToStr(pEpSet, buf, tListLen(buf));
if (code) { // print error and continue
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
if (!isEqual) {
(*pUpdated) = true;
char tmp[512] = {0};
(void)epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors
code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors
if (code) { // print error and continue
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
epsetAssign(&pTask->info.epSet, pEpSet);
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
@ -56,15 +66,15 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
// check for the dispatch info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
}
return 0;
return code;
}
static void freeItem(void* p) {
@ -227,17 +237,17 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
if (pTask->schedInfo.pDelayTimer != NULL) {
(void)taosTmrStop(pTask->schedInfo.pDelayTimer);
streamTmrStop(pTask->schedInfo.pDelayTimer);
pTask->schedInfo.pDelayTimer = NULL;
}
if (pTask->hTaskInfo.pTimer != NULL) {
(void)taosTmrStop(pTask->hTaskInfo.pTimer);
streamTmrStop(pTask->hTaskInfo.pTimer);
pTask->hTaskInfo.pTimer = NULL;
}
if (pTask->msgInfo.pRetryTmr != NULL) {
(void)taosTmrStop(pTask->msgInfo.pRetryTmr);
streamTmrStop(pTask->msgInfo.pRetryTmr);
pTask->msgInfo.pRetryTmr = NULL;
}
@ -402,8 +412,14 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
(void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
if (code) {
stError("%s failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
return code;
}
pTask->refCnt = 1;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
@ -419,7 +435,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
int32_t code = streamCreateStateMachine(pTask);
code = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
tstrerror(code));
@ -439,6 +455,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
if (code) {
stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
return code;
}
@ -484,16 +501,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
return terrno;
}
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
if (pTask->taskCheckInfo.pList == NULL) {
stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
return terrno;
}
if (pTask->chkInfo.pActiveInfo == NULL) {
@ -539,9 +554,14 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
}
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
(void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file.
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
int32_t code = 0;
char buf[512] = {0};
code = epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file.
if (code != 0) { // print error and continue
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) {
@ -552,7 +572,11 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
*pUpdated = true;
char tmp[512] = {0};
(void)epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
if (code != 0) { // print error and continue
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
@ -565,6 +589,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
break;
}
}
return code;
}
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
@ -585,9 +611,13 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
}
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
(void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files.
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
int32_t code = epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files.
if (code != 0) { // print error and continue
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type;
@ -605,8 +635,13 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
if (!isEqual) {
*pUpdated = true;
char tmp[512] = {0};
(void)epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
if (code != 0) { // print error and continue
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
@ -626,7 +661,11 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
*pUpdated = true;
char tmp[512] = {0};
(void)epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
if (code != 0) { // print error and continue
stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
@ -637,6 +676,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
}
}
}
return code;
}
int32_t streamTaskStop(SStreamTask* pTask) {
@ -977,7 +1018,10 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
}
void streamTaskPause(SStreamTask* pTask) {
(void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
if (code) {
stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
}
}
void streamTaskResume(SStreamTask* pTask) {
@ -1182,13 +1226,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
if (pTriggerTmr->tmrHandle != NULL) {
(void)taosTmrStop(pTriggerTmr->tmrHandle);
streamTmrStop(pTriggerTmr->tmrHandle);
pTriggerTmr->tmrHandle = NULL;
}
SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
if (pReadyTmr->tmrHandle != NULL) {
(void)taosTmrStop(pReadyTmr->tmrHandle);
streamTmrStop(pReadyTmr->tmrHandle);
pReadyTmr->tmrHandle = NULL;
}

View File

@ -42,13 +42,16 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) {
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg) {
// while (1) {
bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId);
if (ret) {
// break;
}
// stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg);
// }
bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId);
if (ret) {
}
}
void streamTmrStop(tmr_h tmrId) {
bool stop = taosTmrStop(tmrId);
if (stop) {
// todo
}
}
int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) {