refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-09-30 13:53:53 +08:00
parent 4173144ded
commit 0f28cd1d48
3 changed files with 82 additions and 59 deletions

View File

@ -120,38 +120,39 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
}
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
bool unQualified = false;
const char* id = pTask->id.idStr;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", pTask->id.idStr);
stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id);
return TSDB_CODE_INVALID_MSG;
}
if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr,
pRsp->upstreamTaskId, tstrerror(pRsp->rspCode));
stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId,
tstrerror(pRsp->rspCode));
return TSDB_CODE_SUCCESS;
}
streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask);
if (status.state != TASK_STATUS__CK) {
stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name);
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
streamMutexUnlock(&pTask->lock);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pInfo->lock);
if (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId) {
stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name);
streamMutexUnlock(&pInfo->lock);
if (status.state != TASK_STATUS__CK) {
stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
streamMutexLock(&pInfo->lock);
unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId);
streamMutexUnlock(&pInfo->lock);
if (unQualified) {
stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
pRsp->upstreamTaskId);
@ -963,11 +964,44 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray**
return 0;
}
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, SArray* pNotSendList) {
const char* id = pTask->id.idStr;
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
int32_t vgId = pTask->pMeta->vgId;
int32_t code = doChkptStatusCheck(pTask);
if (code) {
return code;
}
code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref);
return code;
}
// do send retrieve checkpoint trigger msg to upstream
code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
if (code) {
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
code = 0;
}
return code;
}
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
SArray* pNotSendList = NULL;
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
int32_t code = 0;
int32_t numOfNotSend = 0;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
@ -1008,40 +1042,18 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
}
streamMutexLock(&pActiveInfo->lock);
code = chkptTriggerRecvMonitorHelper(pTask, pNotSendList);
streamMutexUnlock(&pActiveInfo->lock);
int32_t code = doChkptStatusCheck(pTask);
if (code) {
streamMutexUnlock(&pActiveInfo->lock);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// send msg to retrieve checkpoint trigger msg
SArray* pList = pTask->upstreamInfo.pList;
SArray* pNotSendList = NULL;
code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref);
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotSendList);
return;
}
// do send retrieve checkpoint trigger msg to upstream
int32_t size = taosArrayGetSize(pNotSendList);
code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
if (code) {
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
}
streamMutexUnlock(&pActiveInfo->lock);
// check every 100ms
if (size > 0) {
numOfNotSend = taosArrayGetSize(pNotSendList);
if (numOfNotSend > 0) {
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
} else {
@ -1106,19 +1118,13 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
return code;
}
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) {
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
if (pStatus.state != TASK_STATUS__CK) {
return false;
}
streamMutexLock(&pInfo->lock);
if (!pInfo->dispatchTrigger) {
streamMutexUnlock(&pInfo->lock);
return false;
}
@ -1146,14 +1152,29 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
}
streamMutexUnlock(&pInfo->lock);
return true;
}
streamMutexUnlock(&pInfo->lock);
return false;
}
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
if (pStatus.state != TASK_STATUS__CK) {
return false;
}
streamMutexLock(&pInfo->lock);
bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId);
streamMutexUnlock(&pInfo->lock);
return send;
}
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
*pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
@ -1169,8 +1190,10 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t now = taosGetTimestampMs();
int32_t code = 0;
streamMutexLock(&pInfo->lock);
pInfo->dispatchTrigger = true;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
@ -1178,8 +1201,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
if (px == NULL) { // pause the stream task, if memory not enough
streamMutexUnlock(&pInfo->lock);
return terrno;
code = terrno;
}
} else {
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
@ -1191,14 +1213,15 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
if (px == NULL) { // pause the stream task, if memory not enough
streamMutexUnlock(&pInfo->lock);
return terrno;
code = terrno;
break;
}
}
}
streamMutexUnlock(&pInfo->lock);
return 0;
return code;
}
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {

View File

@ -780,7 +780,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// dispatch checkpoint msg to all downstream tasks
int32_t type = pInput->type;
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
int32_t code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
if (code != 0) {
stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));
}

View File

@ -42,7 +42,7 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
for (int32_t k = 0; k < numOfExisted; ++k) {
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) {
return true;
}
@ -56,7 +56,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
streamMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
for (int j = 0; j < num; ++j) {
for (int32_t j = 0; j < num; ++j) {
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j);
bool exist = existInHbMsg(pMsg, pTaskEpset);