Merge pull request #28219 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2024-10-01 15:35:12 +08:00 committed by GitHub
commit dfe03572fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 148 additions and 102 deletions

View File

@ -904,7 +904,7 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
return -1;
}
if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) {
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
@ -1055,7 +1055,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
numOfNotSend = taosArrayGetSize(pNotSendList);
if (numOfNotSend > 0) {
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
} else {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);

View File

@ -526,6 +526,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
int32_t msgId = pMsgInfo->msgId;
int32_t code = 0;
int64_t now = taosGetTimestampMs();
bool inDispatch = true;
stDebug("s-task:%s start monitor dispatch data", id);
@ -550,12 +551,15 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref);
pTask->msgInfo.inMonitor = 0;
streamMutexUnlock(&pMsgInfo->lock);
return;
pMsgInfo->inMonitor = 0;
inDispatch = false;
}
streamMutexUnlock(&pMsgInfo->lock);
if (!inDispatch) {
return;
}
int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
if (numOfFailed == 0) {
stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS);
@ -638,15 +642,54 @@ void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
"dispatch-monitor");
}
static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
SArray* vgInfo, uint32_t hashValue, int64_t now, bool* pFound) {
size_t numOfVgroups = taosArrayGetSize(vgInfo);
int32_t code = 0;
*pFound = false;
for (int32_t j = 0; j < numOfVgroups; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo == NULL) {
continue;
}
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) {
stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno));
return code;
}
if (pReqs[j].blockNum == 0) {
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
if (pDstVgroupInfo != NULL) {
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
}
}
pReqs[j].blockNum++;
*pFound = true;
break;
}
}
return code;
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId,
int64_t now) {
bool found = false;
uint32_t hashValue = 0;
int32_t numOfVgroups = 0;
int32_t code = 0;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
if (pTask->pNameMap == NULL) {
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pTask->pNameMap == NULL) {
stError("s-task:%s failed to init the name map, code:%s", pTask->id.idStr, tstrerror(terrno));
return terrno;
}
}
void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
@ -669,11 +712,11 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
}
} else {
int32_t code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId,
pDataBlock->info.parTbName);
code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId,
pDataBlock->info.parTbName);
if (code) {
stError("s-task:%s failed to build child table name for group:%" PRId64 ", code:%s", pTask->id.idStr,
groupId, tstrerror(code));
stError("s-task:%s failed to build child table name for group:%" PRId64 ", code:%s", pTask->id.idStr, groupId,
tstrerror(code));
}
}
@ -688,44 +731,21 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
// failed to put into name buffer, no need to do anything
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing
int32_t code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing
code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
}
}
numOfVgroups = taosArrayGetSize(vgInfo);
// TODO: optimize search
streamMutexLock(&pTask->msgInfo.lock);
code = doAddDispatchBlock(pTask, pReqs, pDataBlock, vgInfo, hashValue, now, &found);
streamMutexUnlock(&pTask->msgInfo.lock);
for (int32_t j = 0; j < numOfVgroups; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo == NULL) {
continue;
}
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
streamMutexUnlock(&pTask->msgInfo.lock);
return -1;
}
if (pReqs[j].blockNum == 0) {
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
if (pDstVgroupInfo != NULL) {
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
}
}
pReqs[j].blockNum++;
found = true;
break;
}
if (code) {
return code;
}
streamMutexUnlock(&pTask->msgInfo.lock);
if (!found) {
stError("s-task:%s not found req hash value:%u", pTask->id.idStr, hashValue);
stError("s-task:%s not found req hash value:%u, failed to add dispatch block", pTask->id.idStr, hashValue);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
} else {
return 0;
@ -919,7 +939,7 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) {
}
static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level,
const char* id) {
const char* id) {
SArray* pTmp = taosArrayInit(4, sizeof(int32_t));
if (pTmp == NULL) {
return terrno;
@ -940,8 +960,8 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
return terrno;
} else {
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId,
level, pInfo->upstreamTaskId);
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level,
pInfo->upstreamTaskId);
}
}
@ -987,13 +1007,48 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t
}
}
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
int32_t vgId = pTask->pMeta->vgId;
int32_t checkpointId = pActiveInfo->activeId;
const char* id = pTask->id.idStr;
int32_t notRsp = 0;
int32_t code = doTaskChkptStatusCheck(pTask, num);
if (code) {
return code;
}
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id,
tstrerror(code), ref);
return code;
}
notRsp = taosArrayGetSize(pNotRspList);
if (notRsp == 0) {
streamClearChkptReadyMsg(pActiveInfo);
} else {
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
}
return code;
}
static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pNotRspList = NULL;
int32_t code = 0;
int32_t notRsp = 0;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
@ -1004,7 +1059,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
return;
}
@ -1027,45 +1082,26 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
}
streamMutexLock(&pActiveInfo->lock);
code = chkptReadyMsgSendHelper(pTask, pNotRspList);
streamMutexUnlock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
int32_t code = doTaskChkptStatusCheck(pTask, num);
if (code) {
streamMutexUnlock(&pActiveInfo->lock);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id,
tstrerror(code), ref);
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
}
int32_t checkpointId = pActiveInfo->activeId;
int32_t notRsp = taosArrayGetSize(pNotRspList);
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
notRsp = taosArrayGetSize(pNotRspList);
if (notRsp > 0) { // send checkpoint-ready msg again
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
stDebug("s-task:%s start to monitor checkpoint-ready msg recv status in 10s", id);
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
streamMutexUnlock(&pActiveInfo->lock);
} else {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug(
"s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit "
"from timer, ref:%d",
id, vgId, ref);
streamClearChkptReadyMsg(pActiveInfo);
streamMutexUnlock(&pActiveInfo->lock);
// release should be the last execution, since pTask may be destroy after it immidiately.
streamMetaReleaseTask(pTask->pMeta, pTask);
}
@ -1124,7 +1160,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
// mark the timer monitor checkpointId
@ -1190,6 +1226,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
taosMemoryFree(buf);
return terrno;
}
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
@ -1359,29 +1396,11 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstr
pReadyInfo->childId = childId;
}
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return TSDB_CODE_SUCCESS;
}
SStreamUpstreamEpInfo* pInfo = NULL;
streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
if (pInfo == NULL) {
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
STaskCheckpointReadyInfo info = {0};
initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d",
pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index);
static int32_t doAddChkptReadyMsg(SStreamTask* pTask, STaskCheckpointReadyInfo* pInfo) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pActiveInfo->lock);
void* px = taosArrayPush(pActiveInfo->pReadyMsgList, &info);
void* px = taosArrayPush(pActiveInfo->pReadyMsgList, pInfo);
if (px == NULL) {
streamMutexUnlock(&pActiveInfo->lock);
stError("s-task:%s failed to add readyMsg info, code: out of memory", pTask->id.idStr);
return terrno;
}
@ -1395,10 +1414,36 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
stDebug("s-task:%s %d/%d checkpoint-trigger recv", pTask->id.idStr, numOfRecv, total);
}
streamMutexUnlock(&pActiveInfo->lock);
return 0;
}
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) {
int32_t code = 0;
STaskCheckpointReadyInfo info = {0};
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return TSDB_CODE_SUCCESS;
}
SStreamUpstreamEpInfo* pInfo = NULL;
streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
if (pInfo == NULL) {
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d",
pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index);
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pActiveInfo->lock);
code = doAddChkptReadyMsg(pTask, &info);
streamMutexUnlock(&pActiveInfo->lock);
return code;
}
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
if (pActiveInfo == NULL) {
return;

View File

@ -83,12 +83,14 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void streamTaskResumeHelper(void* param, void* tmrId) {
SStreamTask* pTask = (SStreamTask*)param;
SStreamTaskId* pId = &pTask->id;
SStreamTaskState p = streamTaskGetStatus(pTask);
SStreamTask* pTask = (SStreamTask*)param;
SStreamTaskId* pId = &pTask->id;
SStreamTaskState p = streamTaskGetStatus(pTask);
int32_t code = 0;
if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
TAOS_UNUSED(status);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref);
@ -97,13 +99,12 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
return;
}
int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
if (code) {
stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref);
} else {
stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime,
ref);
stDebug("trigger to resume s-task:%s after idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, ref);
// release the task ref count
streamTaskClearSchedIdleInfo(pTask);

View File

@ -64,7 +64,6 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
SStreamScanHistoryReq req;
int32_t code = 0;
initScanHistoryReq(pTask, &req, igUntreated);
int32_t len = sizeof(SStreamScanHistoryReq);
@ -173,7 +172,7 @@ int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) {
code = streamTaskStartScanHistory(pTask);
}
// NOTE: there will be an deadlock if launch fill history here.
// NOTE: there will be a deadlock if launch fill history here.
// start the related fill-history task, when current task is ready
// if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
// streamLaunchFillHistoryTask(pTask);
@ -219,7 +218,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
// Set the execution conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
streamMetaRUnLock(pMeta);