refactor: optimize the fill-history task launch policy, and do some other internal refactor.

This commit is contained in:
Haojun Liao 2023-09-27 14:00:48 +08:00
parent 38164435f9
commit dfe8641c0c
9 changed files with 235 additions and 137 deletions

View File

@ -270,13 +270,13 @@ typedef struct SCheckpointInfo {
} SCheckpointInfo;
typedef struct SStreamStatus {
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t pauseAllowed; // allowed task status to be set to be paused
int32_t timerActive; // timer is active
} SStreamStatus;
typedef struct SDataRange {
@ -304,6 +304,7 @@ typedef struct SDispatchMsgInfo {
int32_t retryCount; // retry send data count
int64_t startTs; // dispatch start time, record total elapsed time for dispatch
SArray* pRetryList; // current dispatch successfully completed node of downstream
void* pTimer; // used to dispatch data after a given time duration
} SDispatchMsgInfo;
typedef struct STaskOutputInfo {
@ -345,7 +346,14 @@ typedef struct STaskExecStatisInfo {
SSinkRecorder sink;
} STaskExecStatisInfo;
typedef struct STaskTimer STaskTimer;
typedef struct SHistoryTaskInfo {
STaskId id;
void* pTimer;
int32_t tickCount;
int32_t retryTimes;
int32_t waitInterval;
} SHistoryTaskInfo;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;
@ -361,7 +369,7 @@ struct SStreamTask {
SCheckpointInfo chkInfo;
STaskExec exec;
SDataRange dataRange;
STaskId historyTaskId;
SHistoryTaskInfo hTaskInfo;
STaskId streamTaskId;
STaskExecStatisInfo execInfo;
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
@ -378,7 +386,6 @@ struct SStreamTask {
};
STokenBucket* pTokenBucket;
STaskTimer* pTimer;
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
SArray* pRspMsgList;

View File

@ -296,8 +296,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
(*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId;
(*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId;
(*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
(*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
(*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
(*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;

View File

@ -867,7 +867,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam);
pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
}
return 0;
@ -1218,7 +1218,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
if (pTask->historyTaskId.taskId == 0) {
if (pTask->hTaskInfo.id.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug(
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
@ -1432,12 +1432,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
streamTaskPause(pTask, pMeta);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pTask->hTaskInfo.id.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId);
pMeta->vgId, pTask->hTaskInfo.id.taskId);
streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
@ -1506,7 +1506,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
}
SStreamTask* pHistoryTask =
streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
streamMetaAcquireTask(pTq->pStreamMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
}
@ -1802,8 +1802,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamSetStatusNormal(pTask);
SStreamTask** ppHTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId));
if (pTask->hTaskInfo.id.taskId != 0) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);

View File

@ -26,7 +26,19 @@
extern "C" {
#endif
#define CHECK_DOWNSTREAM_INTERVAL 100
#define CHECK_DOWNSTREAM_INTERVAL 100
#define LAUNCH_HTASK_INTERVAL 100
#define WAIT_FOR_MINIMAL_INTERVAL 100.00
#define MAX_RETRY_LAUNCH_HISTORY_TASK 20
#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2
#define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT 5
#define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
// clang-format off
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
@ -58,11 +70,6 @@ struct STokenBucket {
int64_t fillTimestamp; // fill timestamp
};
struct STaskTimer {
void* hTaskLaunchTimer;
void* dispatchTimer;
};
extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
@ -107,6 +114,9 @@ int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
@ -115,7 +125,7 @@ void streamQueueProcessFail(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
STaskId extractStreamTaskKey(const SStreamTask* pTask);
#ifdef __cplusplus
}

View File

@ -18,10 +18,6 @@
#include "ttimer.h"
#include "tmisce.h"
#define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT 5
typedef struct SBlockName {
uint32_t hashValue;
char parTbName[TSDB_TABLE_NAME_LEN];
@ -425,7 +421,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
int32_t msgId = pTask->execInfo.dispatch;
if (streamTaskShouldStop(&pTask->status)) {
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
return;
}
@ -487,26 +483,25 @@ static void doRetryDispatchData(void* param, void* tmrId) {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
} else {
int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
}
} else {
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
}
}
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
STaskTimer* pTmr = pTask->pTimer;
pTask->msgInfo.retryCount++;
stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
if (pTmr->dispatchTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTmr->dispatchTimer);
if (pTask->msgInfo.pTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
} else {
pTmr->dispatchTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
}
}
@ -636,7 +631,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
@ -1143,7 +1138,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt);
}
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);

View File

@ -310,7 +310,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr);
}
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
ASSERT(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
@ -361,8 +361,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
pStreamTask->historyTaskId.streamId = 0;
pStreamTask->hTaskInfo.id.taskId = 0;
pStreamTask->hTaskInfo.id.streamId = 0;
// 6. save to disk
taosWLockLatch(&pMeta->lock);

View File

@ -21,10 +21,6 @@
#include "tstream.h"
#include "ttimer.h"
#define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
@ -547,8 +543,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId));
if (ppStreamTask != NULL) {
(*ppStreamTask)->historyTaskId.taskId = 0;
(*ppStreamTask)->historyTaskId.streamId = 0;
(*ppStreamTask)->hTaskInfo.id.taskId = 0;
(*ppStreamTask)->hTaskInfo.id.streamId = 0;
}
} else {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
@ -698,7 +694,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask);
STaskId id = extractStreamTaskKey(pTask);
STaskId id = streamTaskExtractKey(pTask);
taosArrayPush(pRecycleList, &id);
int32_t total = taosArrayGetSize(pRecycleList);
@ -807,7 +803,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
return 0;
}
static bool enoughTimeDuration(SMetaHbInfo* pInfo) {
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0;
return true;
@ -844,7 +840,7 @@ void metaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
if (!enoughTimeDuration(pMeta->pHbInfo)) {
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;

View File

@ -29,9 +29,12 @@ typedef struct STaskRecheckInfo {
void* checkTimer;
} STaskRecheckInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static int32_t getNextRetryInterval(int32_t waitInterval);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
SStreamMeta* pMeta = pTask->pMeta;
@ -54,7 +57,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
taosWLockLatch(&pMeta->lock);
STaskId id = extractStreamTaskKey(pTask);
STaskId id = streamTaskExtractKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
@ -90,20 +93,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0;
}
const char* streamGetTaskStatusStr(int32_t status) {
switch(status) {
case TASK_STATUS__NORMAL: return "normal";
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused";
case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop";
case TASK_STATUS__UNINIT: return "uninitialized";
default:return "";
}
}
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
@ -249,7 +238,7 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
}
destroyRecheckInfo(pInfo);
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
}
@ -302,7 +291,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
ASSERT(pTask->historyTaskId.taskId == 0);
ASSERT(pTask->hTaskInfo.id.taskId == 0);
} else {
stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
streamTaskEnablePause(pTask);
@ -375,7 +364,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} else {
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
@ -464,6 +453,10 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8
return 0;
}
int32_t getNextRetryInterval(int32_t waitInterval) {
return waitInterval * RETRY_LAUNCH_INTERVAL_INC_RATE;
}
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
@ -609,12 +602,47 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
doCheckDownstreamStatus(pHTask);
}
static bool doLaunchHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo) {
SStreamMeta* pMeta = pTask->pMeta;
streamTaskSetRetryInfoForLaunch(&pTask->hTaskInfo);
stDebug("s-task:%s try launch related fill-history task in timer, retry:%d", pTask->id.idStr,
pTask->hTaskInfo.retryTimes);
ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* p = streamGetTaskStatusStr(pTask->status.taskStatus);
stWarn(
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have "
"been destroyed, or should stop",
pTask->id.idStr, pMeta->vgId, streamGetTaskStatusStr(pTask->status.taskStatus), (int32_t)pTask->hTaskInfo.id.taskId);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer);
streamMetaReleaseTask(pMeta, pTask);
return true;
}
if (pHTask != NULL) {
checkFillhistoryTaskStatus(pTask, pHTask);
streamMetaReleaseTask(pMeta, pHTask);
}
// not in timer anymore
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pTask->hTaskInfo.retryTimes, ref);
streamMetaReleaseTask(pMeta, pTask);
return false;
}
static void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
stDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId);
taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
if (ppTask) {
@ -622,10 +650,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
if (streamTaskShouldStop(&(*ppTask)->status)) {
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
stDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, pStatus, (*ppTask)->hTaskInfo.retryTimes, ref);
taosMemoryFree(pInfo);
atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1);
taosWUnLockLatch(&pMeta->lock);
return;
}
@ -634,78 +664,120 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
if (pTask != NULL) {
ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
stWarn(
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
"destroyed, or should stop",
pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId);
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer);
pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask);
return;
}
if (pHTask != NULL) {
checkFillhistoryTaskStatus(pTask, pHTask);
streamMetaReleaseTask(pMeta, pHTask);
}
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
taosMemoryFree(pInfo);
streamMetaReleaseTask(pMeta, pTask);
// not in timer anymore
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
streamMetaReleaseTask(pMeta, pTask);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task timer, ref:%d",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, ref);
pHTaskInfo->id.taskId = 0;
pHTaskInfo->id.streamId = 0;
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* p = streamGetTaskStatusStr(pTask->status.taskStatus);
int32_t hTaskId = pHTaskInfo->id.taskId;
stDebug(
"s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch related fill-history task in "
"timer, retryCount:%d",
pTask->id.idStr, p, pHTaskInfo->retryTimes, hTaskId);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask);
return;
}
if (pHTask != NULL) {
checkFillhistoryTaskStatus(pTask, pHTask);
streamMetaReleaseTask(pMeta, pHTask);
}
// not in timer anymore
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pHTaskInfo->retryTimes, ref);
streamMetaReleaseTask(pMeta, pTask);
}
} else {
stError("s-task:0x%x failed to load task, it may have been destroyed", (int32_t) pInfo->id.taskId);
stError("s-task:0x%x failed to load task, it may have been destroyed, not launch related fill-history task",
(int32_t)pInfo->id.taskId);
}
taosMemoryFree(pInfo);
}
// todo fix the bug: 2. race condition
SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pInfo->id.taskId = taskId;
pInfo->id.streamId = streamId;
pInfo->pMeta = pMeta;
return pInfo;
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
if (hTaskId == 0) {
return TSDB_CODE_SUCCESS;
}
ASSERT(pTask->status.downstreamReady == 1);
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->historyTaskId.streamId, hTaskId);
pTask->hTaskInfo.id.streamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId));
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (pHTask == NULL) {
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, hTaskId);
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", pTask->id.idStr, pMeta->vgId,
hTaskId);
SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
pInfo->id.taskId = pTask->id.taskId;
pInfo->id.streamId = pTask->id.streamId;
pInfo->pMeta = pTask->pMeta;
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
if (pInfo == NULL) {
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", pTask->id.idStr);
return terrno;
}
if (pTask->pTimer->hTaskLaunchTimer == NULL) {
pTask->pTimer->hTaskLaunchTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer);
if (pTask->pTimer->hTaskLaunchTimer == NULL) {
// todo failed to create timer
streamTaskInitForLaunchHTask(&pTask->hTaskInfo);
if (pTask->hTaskInfo.pTimer == NULL) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer);
if (pTask->hTaskInfo.pTimer == NULL) { // todo failed to create timer
atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr,
pTask->status.timerActive);
taosMemoryFree(pInfo);
} else {
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
ASSERT(ref == 1);
ASSERT(ref >= 1);
stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref);
}
} else { // timer exists
ASSERT(pTask->status.timerActive == 1);
ASSERT(pTask->status.timerActive >= 1);
stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer);
taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer);
}
// try again in 100ms
return TSDB_CODE_SUCCESS;
}
@ -849,7 +921,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
}
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
if (pTask->historyTaskId.taskId == 0) {
if (pTask->hTaskInfo.id.taskId == 0) {
SDataRange* pRange = &pTask->dataRange;
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64

View File

@ -96,8 +96,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
int32_t taskId = pTask->historyTaskId.taskId;
if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1;
int32_t taskId = pTask->hTaskInfo.id.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
@ -169,9 +169,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1;
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->historyTaskId.taskId = taskId;
pTask->hTaskInfo.id.taskId = taskId;
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &taskId)) return -1;
@ -312,18 +312,14 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->schedInfo.pTimer = NULL;
}
if (pTask->pTimer != NULL) {
if (pTask->pTimer->hTaskLaunchTimer != NULL) {
taosTmrStop(pTask->pTimer->hTaskLaunchTimer);
pTask->pTimer->hTaskLaunchTimer = NULL;
}
if (pTask->hTaskInfo.pTimer != NULL) {
taosTmrStop(pTask->hTaskInfo.pTimer);
pTask->hTaskInfo.pTimer = NULL;
}
if (pTask->pTimer->dispatchTimer != NULL) {
taosTmrStop(pTask->pTimer->dispatchTimer);
pTask->pTimer->dispatchTimer = NULL;
}
taosMemoryFreeClear(pTask->pTimer);
if (pTask->msgInfo.pTimer != NULL) {
taosTmrStop(pTask->msgInfo.pTimer);
pTask->msgInfo.pTimer = NULL;
}
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
@ -425,12 +421,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->pTimer = taosMemoryCalloc(1, sizeof(STaskTimer));
if (pTask->pTimer == NULL) {
stError("s-task:%s failed to prepare the timer, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2);
@ -689,7 +679,35 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
return code;
}
STaskId extractStreamTaskKey(const SStreamTask* pTask) {
STaskId streamTaskExtractKey(const SStreamTask* pTask) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
return id;
}
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
pInfo->retryTimes = 0;
}
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
ASSERT(pInfo->tickCount == 0);
pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
pInfo->retryTimes += 1;
}
const char* streamGetTaskStatusStr(int32_t status) {
switch(status) {
case TASK_STATUS__NORMAL: return "normal";
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused";
case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop";
case TASK_STATUS__UNINIT: return "uninitialized";
default:return "";
}
}