log(stream): update logs.

This commit is contained in:
Haojun Liao 2023-09-27 15:37:12 +08:00
parent 8f2d217834
commit 2dade996bb
1 changed files with 2 additions and 44 deletions

View File

@ -32,7 +32,6 @@ typedef struct STaskRecheckInfo {
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static int32_t getNextRetryInterval(int32_t waitInterval);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
@ -453,10 +452,6 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8
return 0;
}
int32_t getNextRetryInterval(int32_t waitInterval) {
return waitInterval * RETRY_LAUNCH_INTERVAL_INC_RATE;
}
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
@ -602,43 +597,6 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
doCheckDownstreamStatus(pHTask);
}
static bool doLaunchHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo) {
SStreamMeta* pMeta = pTask->pMeta;
streamTaskSetRetryInfoForLaunch(&pTask->hTaskInfo);
stDebug("s-task:%s try launch related fill-history task in timer, retry:%d", pTask->id.idStr,
pTask->hTaskInfo.retryTimes);
ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* p = streamGetTaskStatusStr(pTask->status.taskStatus);
stWarn(
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have "
"been destroyed, or should stop",
pTask->id.idStr, pMeta->vgId, streamGetTaskStatusStr(pTask->status.taskStatus), (int32_t)pTask->hTaskInfo.id.taskId);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer);
streamMetaReleaseTask(pMeta, pTask);
return true;
}
if (pHTask != NULL) {
checkFillhistoryTaskStatus(pTask, pHTask);
streamMetaReleaseTask(pMeta, pHTask);
}
// not in timer anymore
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pTask->hTaskInfo.retryTimes, ref);
streamMetaReleaseTask(pMeta, pTask);
return false;
}
static void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
@ -678,8 +636,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
streamMetaReleaseTask(pMeta, pTask);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task timer, ref:%d",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, ref);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref);
pHTaskInfo->id.taskId = 0;
pHTaskInfo->id.streamId = 0;