refactor: check return value for each function.

This commit is contained in:
Haojun Liao 2024-07-17 15:40:00 +08:00
parent 14f149cdd3
commit 552e59f42b
17 changed files with 403 additions and 325 deletions

View File

@ -59,8 +59,6 @@ extern "C" {
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
#define STREAM_EXEC_T_RESUME_TASK (-6)
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
// the load and start stream task should be executed after snode has started successfully, since the load of stream
// tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources.
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
@ -190,7 +188,7 @@ typedef struct {
SSDataBlock* pBlock;
} SStreamRefDataBlock;
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
int32_t streamDataSubmitNew(SPackedData* pData, int32_t type, SStreamDataSubmit** pSubmit);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
typedef struct {
@ -528,8 +526,8 @@ typedef struct STaskUpdateEntry {
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** pTask);
void tFreeStreamTask(SStreamTask* pTask);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
@ -629,7 +627,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
SStreamUpstreamEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo);
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId);
void streamTaskInputFail(SStreamTask* pTask);
@ -724,18 +722,18 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
// stream task meta
void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn,
int32_t vgId, int64_t stage, startComplete_fn_t fn);
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, int32_t vgId,
int64_t stage, startComplete_fn_t fn, SStreamMeta** pMeta);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask);
void streamMetaAcquireOneTask(SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
@ -785,11 +783,11 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp
int32_t setCode);
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq);
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes);
// stream task state machine, and event handling
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
int32_t streamCreateStateMachine(SStreamTask* pTask);
void streamDestroyStateMachine(SStreamTaskSM* pSM);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn,
void* param);
@ -803,7 +801,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp);
int32_t streamTaskSendPreparedCheckpointsourceRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask);
#ifdef __cplusplus

View File

@ -242,11 +242,11 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList,
pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
SStreamTask* pTask = NULL;
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory,
pStream->subTableWithoutMd5, &pTask);
if (code != 0) {
return code;
}
mDebug("doAddSinkTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
@ -353,18 +353,13 @@ static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFi
}
}
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) {
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam, SStreamTask** pTask) {
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pTask =
tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pTask == NULL) {
return NULL;
}
return pTask;
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pTask);
return code;
}
static void addNewTaskList(SStreamObj* pStream) {
@ -398,10 +393,10 @@ static void setHTasksId(SStreamObj* pStream) {
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
SStreamTask* pTask = NULL;
int32_t code = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
@ -412,10 +407,9 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
if (code != 0) {
terrno = code;
return terrno;
code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return TDB_CODE_SUCCESS;
@ -498,28 +492,29 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
return TSDB_CODE_SUCCESS;
}
static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) {
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam,
SStreamTask** pAggTask) {
*pAggTask = NULL;
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pAggTask =
int32_t code =
tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
return pAggTask;
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pAggTask);
return code;
}
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
int32_t code = 0;
SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam);
if (pTask == NULL) {
return terrno;
SStreamTask* pTask = NULL;
code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pSnode != NULL) {
code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
mDebug("doAddAggTask taskId:%s, snode id:%d, isFillHistory:%d", pTask->id.idStr, pSnode->id, isFillhistory);

View File

@ -71,9 +71,9 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
startRsync();
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t code = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE,
taosGetTimestampMs(), tqStartTaskCompleteCallback, &pSnode->pMeta);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}

View File

@ -297,8 +297,17 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG);
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask);
pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo();
int32_t code = streamCreateStateMachine(pStreamTask);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;

View File

@ -90,10 +90,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
int32_t tqInitialize(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
pTq->pStreamMeta =
streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1, tqStartTaskCompleteCallback);
if (pTq->pStreamMeta == NULL) {
return -1;
int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1,
tqStartTaskCompleteCallback, &pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
streamMetaLoadAllTasks(pTq->pStreamMeta);
@ -856,7 +857,8 @@ int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
SStreamMeta* pMeta = pStreamTask->pMeta;
STaskId hId = pStreamTask->hTaskInfo.id;
SStreamTask* pTask = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask);
if (pTask == NULL) {
tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t) hId.taskId);
return TSDB_CODE_SUCCESS;
@ -874,7 +876,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
pMeta->vgId, pReq->taskId);
@ -961,7 +964,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(pTask->info.fillHistory == 1);
// 1. get the related stream task
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
SStreamTask* pStreamTask = NULL;
code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
if (pStreamTask == NULL) {
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
@ -1121,7 +1125,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
" transId:%d it may have been destroyed",

View File

@ -341,10 +341,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
if (code != 0) {
tqError("%s failed to create data submit for stream since out of memory", id);
return code;
}

View File

@ -324,7 +324,8 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
continue;
}
@ -337,7 +338,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = false;
// seek the stored version and extract data from WAL
int32_t code = setWalReaderStartOffset(pTask, vgId);
code = setWalReaderStartOffset(pTask, vgId);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;

View File

@ -140,17 +140,19 @@ int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int3
}
tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) {
tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
int32_t code = streamTaskSendRestoreChkptMsg(pTask);
code = streamTaskSendRestoreChkptMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
// this is to process request from transaction, always return true.
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -230,7 +232,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
// send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
streamTaskSendPreparedCheckpointsourceRsp(pTask);
(void) streamTaskSendCheckpointsourceRsp(pTask);
streamTaskResetStatus(pTask);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
@ -329,7 +331,8 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
@ -388,7 +391,8 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pMeta, pTask);
@ -411,7 +415,8 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
req.dstTaskId);
@ -419,7 +424,6 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return -1;
}
int32_t code = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamProcessRetrieveReq(pTask, &req);
} else {
@ -484,7 +488,8 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
if (pTask == NULL) {
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
}
@ -511,7 +516,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
@ -598,7 +604,8 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
if (restored) {
SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId);
SStreamTask* p = NULL;
code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
if ((p != NULL) && (p->info.fillHistory == 0)) {
tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
}
@ -773,7 +780,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask != NULL) {
char* pStatus = NULL;
@ -794,7 +802,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
return 0;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
if (streamTaskReadyToRun(pTask, &p)) {
@ -862,7 +871,8 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
@ -903,7 +913,8 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64
" from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
@ -971,7 +982,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask);
if (pTask == NULL) {
tqError(
"vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
@ -990,7 +1002,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg)
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId);
@ -1003,7 +1016,8 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
SStreamTask* pHistoryTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
pHistoryTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already",
@ -1070,7 +1084,8 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask == NULL) {
tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
@ -1081,13 +1096,14 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState->name);
taosThreadMutexUnlock(&pTask->lock);
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (code != 0) {
return code;
}
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
SStreamTask* pHTask = NULL;
code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
if (pHTask) {
taosThreadMutexLock(&pHTask->lock);
SStreamTaskState* p = streamTaskGetStatus(pHTask);
@ -1121,7 +1137,8 @@ int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) { r
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
pRsp->downstreamNodeId, pRsp->downstreamTaskId);
@ -1154,7 +1171,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, req.taskId);

View File

@ -40,8 +40,11 @@ static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL);
SStreamUpstreamEpInfo* pInfo = NULL;
int32_t code = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
*oldStage = pInfo->stage;
const char* id = pTask->id.idStr;
@ -166,7 +169,8 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, taskId, &pTask);
if (pTask != NULL) {
pRsp->status =
streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);

View File

@ -29,12 +29,11 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId, int32_t srcTaskId) {
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
int32_t srcTaskId, SStreamDataBlock** pRes) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pChkpoint->type = checkpointType;
@ -46,8 +45,7 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pChkpoint);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pBlock->info.type = STREAM_CHECKPOINT;
@ -60,28 +58,37 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
if (pChkpoint->blocks == NULL) {
taosMemoryFree(pBlock);
taosFreeQitem(pChkpoint);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pChkpoint->blocks, pBlock);
void* p = taosArrayPush(pChkpoint->blocks, pBlock);
if (p == NULL) {
taosArrayDestroy(pChkpoint->blocks);
taosMemoryFree(pBlock);
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pRes = pChkpoint;
taosMemoryFree(pBlock);
terrno = 0;
return pChkpoint;
return TSDB_CODE_SUCCESS;
}
// this message must be put into inputq successfully, continue retrying until it succeeds
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
int32_t srcTaskId) {
SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId);
SStreamDataBlock* pCheckpoint = NULL;
int32_t code = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId, &pCheckpoint);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
streamTrySchedExec(pTask);
return TSDB_CODE_SUCCESS;
return streamTrySchedExec(pTask);
}
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
@ -176,22 +183,26 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
if (pTask->chkInfo.checkpointId > checkpointId) {
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
taosThreadMutexUnlock(&pTask->lock);
code = taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
return code;
}
if (pTask->chkInfo.checkpointId == checkpointId) {
{ // send checkpoint-ready msg to upstream
SRpcMsg msg = {0};
SStreamUpstreamEpInfo* pInfo = NULL;
code = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId);
initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
tmsgSendReq(&pInfo->epSet, &msg);
}
@ -202,10 +213,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
id, vgId, pBlock->srcTaskId);
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
taosThreadMutexUnlock(&pTask->lock);
code = taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
return code;
}
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
@ -213,19 +224,19 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
" discard",
id, vgId, pActiveInfo->activeId, checkpointId);
taosThreadMutexUnlock(&pTask->lock);
code = taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
return code;
} else { // checkpointId == pActiveInfo->activeId
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
stDebug(
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
"checkpointId:%" PRId64 " transId:%d",
id, vgId, checkpointId, transId);
taosThreadMutexUnlock(&pTask->lock);
code = taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
return code;
}
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -238,16 +249,16 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
taosThreadMutexUnlock(&pTask->lock);
code = taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
return code;
}
}
}
}
}
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
", transId:%d current active checkpointId:%" PRId64,
@ -357,7 +368,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
return -1;
}
taosThreadMutexLock(&pInfo->lock);
(void) taosThreadMutexLock(&pInfo->lock);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
@ -384,7 +395,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
int32_t transId = pInfo->transId;
taosThreadMutexUnlock(&pInfo->lock);
(void) taosThreadMutexUnlock(&pInfo->lock);
if (notReady == 0) {
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
@ -399,7 +410,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
int64_t now = taosGetTimestampMs();
int32_t numOfConfirmed = 0;
taosThreadMutexLock(&pInfo->lock);
(void) taosThreadMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
@ -420,7 +431,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
numOfConfirmed, checkpointId);
taosThreadMutexUnlock(&pInfo->lock);
(void) taosThreadMutexUnlock(&pInfo->lock);
return TSDB_CODE_SUCCESS;
}
@ -428,12 +439,12 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->chkInfo.startTs = 0; // clear the recorded start time
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock);
(void) taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock);
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
}
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
(void) taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
}
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
@ -443,7 +454,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
if (pReq->checkpointId <= pInfo->checkpointId) {
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64
@ -451,13 +462,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
" transId:%d ignored",
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
pReq->transId);
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
{ // destroy the related fill-history tasks
// drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
(void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
@ -519,12 +530,12 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
return code;
}
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
streamMetaWUnLock(pMeta);
// drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
(void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
(int32_t)pReq->hTaskId, numOfTasks);
@ -718,9 +729,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
}
} else { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
@ -758,13 +769,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
pActiveInfo->checkCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
@ -775,14 +786,14 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
ref);
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
taosThreadMutexLock(&pActiveInfo->lock);
(void) taosThreadMutexLock(&pActiveInfo->lock);
// send msg to retrieve checkpoint trigger msg
SArray* pList = pTask->upstreamInfo.pList;
@ -815,7 +826,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// do send retrieve checkpoint trigger msg to upstream
int32_t size = taosArrayGetSize(pNotSendList);
doSendRetrieveTriggerMsg(pTask, pNotSendList);
taosThreadMutexUnlock(&pActiveInfo->lock);
(void) taosThreadMutexUnlock(&pActiveInfo->lock);
// check every 100ms
if (size > 0) {
@ -885,9 +896,9 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
return false;
}
taosThreadMutexLock(&pInfo->lock);
(void) taosThreadMutexLock(&pInfo->lock);
if (!pInfo->dispatchTrigger) {
taosThreadMutexUnlock(&pInfo->lock);
(void) taosThreadMutexUnlock(&pInfo->lock);
return false;
}
@ -909,7 +920,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
}
taosThreadMutexUnlock(&pInfo->lock);
(void) taosThreadMutexUnlock(&pInfo->lock);
return true;
}
@ -928,11 +939,12 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_
}
// record the dispatch checkpoint trigger info in the list
// memory insufficient may cause the stream computing stopped
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t now = taosGetTimestampMs();
taosThreadMutexLock(&pInfo->lock);
(void) taosThreadMutexLock(&pInfo->lock);
// outputQ should be empty here
ASSERT(streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) == 0);
@ -942,31 +954,37 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
taosArrayPush(pInfo->pDispatchTriggerList, &p);
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
if (px == NULL) {
// pause the stream task, if memory not enough
}
} else {
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
taosArrayPush(pInfo->pDispatchTriggerList, &p);
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
if (px == NULL) {
// pause the stream task, if memory not enough
}
}
}
taosThreadMutexUnlock(&pInfo->lock);
(void) taosThreadMutexUnlock(&pInfo->lock);
}
int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int32_t num = 0;
taosThreadMutexLock(&pInfo->lock);
(void) taosThreadMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p->recved) {
num++;
}
}
taosThreadMutexUnlock(&pInfo->lock);
(void) taosThreadMutexUnlock(&pInfo->lock);
return num;
}
@ -974,7 +992,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int32_t taskId = 0;
taosThreadMutexLock(&pInfo->lock);
(void) taosThreadMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
@ -988,7 +1006,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
}
}
taosThreadMutexUnlock(&pInfo->lock);
(void) taosThreadMutexUnlock(&pInfo->lock);
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask);
int32_t total = streamTaskGetNumOfDownstream(pTask);
@ -1045,7 +1063,8 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
stDebug("[s3] upload checkpoint:%s", filename);
}
}
taosCloseDir(&pDir);
(void) taosCloseDir(&pDir);
return code;
}
@ -1162,78 +1181,37 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
}
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
int32_t code;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
if (pTask->status.sendConsensusChkptId == true) {
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
} else {
pTask->status.sendConsensusChkptId = true;
}
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
ASSERT(pTask->pBackend == NULL);
pTask->status.requireConsensusChkptId = true;
#if 0
SRestoreCheckpointInfo req = {
.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId,
.nodeId = vgId,
.checkpointId = pInfo->checkpointId,
.startTs = pTask->execInfo.created,
};
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code);
if (code < 0) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code));
return TSDB_CODE_INVALID_MSG;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeRestoreCheckpointInfo(&encoder, &req)) < 0) {
rpcFreeCont(buf);
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, buf, tlen);
stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId,
pInfo->checkpointId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
#endif
return 0;
}
int32_t streamTaskSendPreparedCheckpointsourceRsp(SStreamTask* pTask) {
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
int32_t code = 0;
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
return code;
}
taosThreadMutexLock(&pTask->lock);
(void) taosThreadMutexLock(&pTask->lock);
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__CK) {
code = streamTaskSendCheckpointSourceRsp(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
(void) taosThreadMutexUnlock(&pTask->lock);
return code;
}

View File

@ -131,17 +131,20 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
return TSDB_CODE_SUCCESS;
}
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
int32_t streamDataSubmitNew(SPackedData* pData, int32_t type, SStreamDataSubmit** pSubmit) {
*pSubmit = NULL;
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
if (pDataSubmit == NULL) {
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pDataSubmit->ver = pData->ver;
pDataSubmit->submit = *pData;
pDataSubmit->type = type;
return pDataSubmit;
*pSubmit = pDataSubmit;
return TSDB_CODE_SUCCESS;
}
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {

View File

@ -1118,7 +1118,11 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
return TSDB_CODE_SUCCESS;
}
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
SStreamUpstreamEpInfo* pInfo = NULL;
int32_t code = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STaskCheckpointReadyInfo info = {0};
initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
@ -1419,8 +1423,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL);
SStreamUpstreamEpInfo* pInfo = NULL;
int32_t code = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pMeta->role == NODE_ROLE_FOLLOWER) {
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
@ -1453,10 +1460,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
{
// do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code));
terrno = code;
return code;
}
@ -1465,5 +1471,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
streamTrySchedExec(pTask);
return 0;
return code;
}

View File

@ -314,7 +314,8 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
SStreamTask* pStreamTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
if (pStreamTask == NULL) {
stError(
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
@ -416,7 +417,8 @@ int32_t streamTransferStatePrepare(SStreamTask* pTask) {
code = streamTransferStateDoPrepare(pTask);
} else {
// no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
SStreamTask* pStreamTask = NULL;
code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
if (pStreamTask != NULL) {
// halt the related stream sink task
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL);
@ -590,7 +592,8 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
SStreamTask* pHTask = NULL;
int32_t code = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
if (pHTask != NULL) {
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);

View File

@ -292,32 +292,39 @@ void streamMetaRemoveDB(void* arg, char* key) {
taosThreadMutexUnlock(&pMeta->backendMutex);
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn,
int32_t vgId, int64_t stage, startComplete_fn_t fn) {
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
*p = NULL;
int32_t code = 0;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t len = strlen(path) + 64;
char* tpath = taosMemoryCalloc(1, len);
if (tpath == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
pMeta->path = tpath;
if (streamMetaOpenTdb(pMeta) < 0) {
code = streamMetaOpenTdb(pMeta);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (streamMetaMayCvtDbFormat(pMeta) < 0) {
if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
tstrerror(terrno));
goto _err;
}
if (streamMetaBegin(pMeta) < 0) {
if ((code = streamMetaBegin(pMeta) < 0)) {
stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
goto _err;
}
@ -325,32 +332,32 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasksMap == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->updateInfo.pTasks == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pReadyTaskSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pFailedTaskSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// task list
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
if (pMeta->pTaskList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -376,10 +383,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
// set the attribute when running on Linux OS
TdThreadRwlockAttr attr;
taosThreadRwlockAttrInit(&attr);
code = taosThreadRwlockAttrInit(&attr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
#ifdef LINUX
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
#endif
taosThreadRwlockInit(&pMeta->lock, &attr);
@ -391,7 +404,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
pMeta->pHbInfo = createMetaHbInfo(pRid);
if (pMeta->pHbInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -403,7 +416,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
}
taosThreadMutexInit(&pMeta->backendMutex, NULL);
return pMeta;
*p = pMeta;
return code;
_err:
taosMemoryFree(pMeta->path);
@ -420,7 +434,7 @@ _err:
taosMemoryFree(pMeta);
stError("failed to open stream meta, reason:%s", tstrerror(terrno));
return NULL;
return code;
}
// todo refactor: the lock shoud be restricted in one function
@ -630,29 +644,30 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
return (int32_t)size;
}
SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || streamTaskShouldStop(*ppTask)) {
return NULL;
*pTask = NULL;
return TSDB_CODE_FAILED;
}
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
return *ppTask;
*pTask = *ppTask;
return TSDB_CODE_SUCCESS;
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
streamMetaRLock(pMeta);
SStreamTask* p = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId);
int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
streamMetaRUnLock(pMeta);
return p;
return code;
}
SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) {
void streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
return pTask;
}
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
@ -719,20 +734,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId);
while (1) {
streamMetaRLock(pMeta);
int32_t timerActive = 0;
streamMetaRLock(pMeta);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
if ((*ppTask)->status.timerActive == 0) {
streamMetaRUnLock(pMeta);
break;
timerActive = (*ppTask)->status.timerActive;
}
streamMetaRUnLock(pMeta);
taosMsleep(10);
stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
streamMetaRUnLock(pMeta);
if (timerActive > 0) {
taosMsleep(100);
stDebug("s-task:0x%x wait for quit from timer", id.taskId);
} else {
streamMetaRUnLock(pMeta);
break;
}
}
@ -1105,8 +1119,10 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
@ -1201,7 +1217,8 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
// initialization, when the operation of check downstream tasks status is executed far quickly.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
@ -1223,7 +1240,8 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
@ -1298,8 +1316,10 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
@ -1339,7 +1359,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId);
streamMetaAddFailedTask(pMeta, streamId, taskId);

View File

@ -92,7 +92,8 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
}
// add ref for task
SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
SStreamTask* p = NULL;
int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p);
if (p == NULL) {
stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId,
streamTaskGetStatus(pTask)->name);
@ -223,7 +224,8 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
streamMetaRUnLock(pMeta);
if (pHTask != NULL) { // it is already added into stream meta store.
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
SStreamTask* pHisTask = NULL;
code = streamMetaAcquireTask(pMeta, hStreamId, hTaskId, &pHisTask);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
@ -353,7 +355,11 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
return;
}
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pInfo->id.streamId, pInfo->id.taskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskNoLock(pMeta, pInfo->id.streamId, pInfo->id.taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) {
// todo
}
streamMetaWUnLock(pMeta);
if (pTask != NULL) {
@ -373,7 +379,8 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId);
SStreamTask* pHTask = NULL;
code = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, &pHTask);
if (pHTask == NULL) {
doRetryLaunchFillHistoryTask(pTask, pInfo, now);
streamMetaReleaseTask(pMeta, pTask);

View File

@ -91,14 +91,15 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
return pEpInfo;
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5) {
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) {
*p = NULL;
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
(int32_t)sizeof(SStreamTask), tstrerror(terrno));
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->ver = SSTREAM_TASK_VER;
@ -110,10 +111,10 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
pTask->info.delaySchedParam = triggerParam;
pTask->subtableWithoutMd5 = subtableWithoutMd5;
pTask->status.pSM = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL) {
int32_t code = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTask);
return NULL;
return code;
}
char buf[128] = {0};
@ -135,7 +136,9 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
epsetAssign(&(pTask->info.mnodeEpset), pEpset);
addToTaskset(pTaskList, pTask);
return pTask;
*p = pTask;
return code;
}
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
@ -274,7 +277,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
tSimpleHashCleanup(pTask->pNameMap);
}
pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM);
streamDestroyStateMachine(pTask->status.pSM);
pTask->status.pSM = NULL;
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
taosMemoryFree(pTask->outputInfo.pTokenBucket);
@ -369,8 +374,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
pTask->status.pSM = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL) {
int32_t code = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
tstrerror(terrno));
return terrno;
@ -391,7 +397,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);
code = taosThreadMutexAttrInit(&attr);
if (code != 0) {
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
return code;
@ -433,10 +439,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
if (pTask->chkInfo.pActiveInfo == NULL) {
pTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo();
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
@ -636,7 +642,13 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
SStreamUpstreamEpInfo* pInfo = NULL;
int32_t code = streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
return;
}
if ((pInfo != NULL) && pInfo->dataAllowed) {
pInfo->dataAllowed = false;
int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
@ -645,8 +657,14 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
}
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if ((pInfo != NULL) && (!pInfo->dataAllowed)) {
SStreamUpstreamEpInfo* pInfo = NULL;
int32_t code = streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
return;
}
if (pInfo != NULL && (!pInfo->dataAllowed)) {
int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t >= 0);
pInfo->dataAllowed = true;
@ -941,17 +959,24 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
return 0;
}
SStreamUpstreamEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
*pEpInfo = NULL;
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < num; ++i) {
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo == NULL) {
return TSDB_CODE_FAILED;
}
if (pInfo->taskId == taskId) {
return pInfo;
*pEpInfo = pInfo;
return TSDB_CODE_SUCCESS;
}
}
stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
return NULL;
return TSDB_CODE_FAILED;
}
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
@ -1041,14 +1066,23 @@ int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_
return TSDB_CODE_SUCCESS;
}
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo() {
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
taosThreadMutexInit(&pInfo->lock, NULL);
if (pInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
return pInfo;
*pRes = pInfo;
return TSDB_CODE_SUCCESS;
}
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {

View File

@ -257,43 +257,41 @@ int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
return code;
}
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
int32_t streamCreateStateMachine(SStreamTask* pTask) {
initStateTransferTable();
const char* id = pTask->id.idStr;
SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
if (pSM == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
tstrerror(terrno));
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pSM->pTask = pTask;
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SFutureHandleEventInfo));
if (pSM->pWaitingEventList == NULL) {
taosMemoryFree(pSM);
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
tstrerror(terrno));
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
// set the initial state for the state-machine of stream task
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->startTs = taosGetTimestampMs();
return pSM;
pTask->status.pSM = pSM;
return TSDB_CODE_SUCCESS;
}
void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
void streamDestroyStateMachine(SStreamTaskSM* pSM) {
if (pSM == NULL) {
return NULL;
return;
}
taosArrayDestroy(pSM->pWaitingEventList);
taosMemoryFree(pSM);
return NULL;
}
static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) {