Merge pull request #23102 from taosdata/fix/liaohj

fix(stream): fix several bugs and do some internal refactor for stream processing.
This commit is contained in:
Haojun Liao 2023-09-28 09:12:39 +08:00 committed by GitHub
commit cc9b9d1054
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 325 additions and 197 deletions

View File

@ -270,13 +270,13 @@ typedef struct SCheckpointInfo {
} SCheckpointInfo; } SCheckpointInfo;
typedef struct SStreamStatus { typedef struct SStreamStatus {
int8_t taskStatus; int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus; int8_t schedStatus;
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t timerActive; // timer is active int8_t pauseAllowed; // allowed task status to be set to be paused
int8_t pauseAllowed; // allowed task status to be set to be paused int32_t timerActive; // timer is active
} SStreamStatus; } SStreamStatus;
typedef struct SDataRange { typedef struct SDataRange {
@ -304,6 +304,7 @@ typedef struct SDispatchMsgInfo {
int32_t retryCount; // retry send data count int32_t retryCount; // retry send data count
int64_t startTs; // dispatch start time, record total elapsed time for dispatch int64_t startTs; // dispatch start time, record total elapsed time for dispatch
SArray* pRetryList; // current dispatch successfully completed node of downstream SArray* pRetryList; // current dispatch successfully completed node of downstream
void* pTimer; // used to dispatch data after a given time duration
} SDispatchMsgInfo; } SDispatchMsgInfo;
typedef struct STaskOutputInfo { typedef struct STaskOutputInfo {
@ -326,23 +327,33 @@ typedef struct SSinkRecorder {
int64_t numOfSubmit; int64_t numOfSubmit;
int64_t numOfBlocks; int64_t numOfBlocks;
int64_t numOfRows; int64_t numOfRows;
int64_t bytes; int64_t dataSize;
} SSinkRecorder; } SSinkRecorder;
typedef struct STaskExecStatisInfo { typedef struct STaskExecStatisInfo {
int64_t created; int64_t created;
int64_t init; int64_t init;
int64_t start;
int64_t step1Start; int64_t step1Start;
int64_t step2Start; int64_t step2Start;
int64_t start;
int32_t updateCount; int32_t updateCount;
int32_t dispatch;
int64_t latestUpdateTs; int64_t latestUpdateTs;
int32_t processDataBlocks;
int64_t processDataSize;
int32_t dispatch;
int64_t dispatchDataSize;
int32_t checkpoint; int32_t checkpoint;
SSinkRecorder sink; SSinkRecorder sink;
} STaskExecStatisInfo; } STaskExecStatisInfo;
typedef struct STaskTimer STaskTimer; typedef struct SHistoryTaskInfo {
STaskId id;
void* pTimer;
int32_t tickCount;
int32_t retryTimes;
int32_t waitInterval;
} SHistoryTaskInfo;
typedef struct STokenBucket STokenBucket; typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo; typedef struct SMetaHbInfo SMetaHbInfo;
@ -358,7 +369,7 @@ struct SStreamTask {
SCheckpointInfo chkInfo; SCheckpointInfo chkInfo;
STaskExec exec; STaskExec exec;
SDataRange dataRange; SDataRange dataRange;
STaskId historyTaskId; SHistoryTaskInfo hTaskInfo;
STaskId streamTaskId; STaskId streamTaskId;
STaskExecStatisInfo execInfo; STaskExecStatisInfo execInfo;
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*> SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
@ -375,7 +386,6 @@ struct SStreamTask {
}; };
STokenBucket* pTokenBucket; STokenBucket* pTokenBucket;
STaskTimer* pTimer;
SMsgCb* pMsgCb; // msg handle SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend SStreamState* pState; // state backend
SArray* pRspMsgList; SArray* pRspMsgList;
@ -415,7 +425,6 @@ typedef struct SStreamMeta {
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
int64_t stage; int64_t stage;
// bool leader;
int32_t role; int32_t role;
STaskStartInfo startInfo; STaskStartInfo startInfo;
SRWLatch lock; SRWLatch lock;

View File

@ -296,8 +296,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
SStreamTask** pStreamTask = taosArrayGet(pTaskList, i); SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
SStreamTask** pHTask = taosArrayGet(pHTaskList, i); SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
(*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId; (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
(*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId; (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
(*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
(*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;

View File

@ -876,7 +876,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam); pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
} }
return 0; return 0;
@ -1227,7 +1227,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
if (pTask->historyTaskId.taskId == 0) { if (pTask->hTaskInfo.id.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug( tqDebug(
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
@ -1441,12 +1441,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
streamTaskPause(pTask, pMeta); streamTaskPause(pTask, pMeta);
SStreamTask* pHistoryTask = NULL; SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) { if (pTask->hTaskInfo.id.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHistoryTask == NULL) { if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64 tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already", ", it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId); pMeta->vgId, pTask->hTaskInfo.id.taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
@ -1515,7 +1515,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
SStreamTask* pHistoryTask = SStreamTask* pHistoryTask =
streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); streamMetaAcquireTask(pTq->pStreamMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHistoryTask) { if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
} }
@ -1810,8 +1810,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
SStreamTask** ppHTask = NULL; SStreamTask** ppHTask = NULL;
if (pTask->historyTaskId.taskId != 0) { if (pTask->hTaskInfo.id.taskId != 0) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) { if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId); pMeta->vgId, req.taskId);

View File

@ -272,11 +272,12 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
SSinkRecorder* pRec = &pTask->execInfo.sink; SSinkRecorder* pRec = &pTask->execInfo.sink;
pRec->numOfSubmit += 1; pRec->numOfSubmit += 1;
if ((pRec->numOfSubmit % 5000) == 0) { if ((pRec->numOfSubmit % 1000) == 0) {
double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
" submit into dst table, %.2fMiB duration:%.2f Sec.", " submit into dst table, %.2fMiB duration:%.2f Sec.",
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el); pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize),
el);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -26,7 +26,19 @@
extern "C" { extern "C" {
#endif #endif
#define CHECK_DOWNSTREAM_INTERVAL 100 #define CHECK_DOWNSTREAM_INTERVAL 100
#define LAUNCH_HTASK_INTERVAL 100
#define WAIT_FOR_MINIMAL_INTERVAL 100.00
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40
#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2
#define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT 5
#define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
// clang-format off // clang-format off
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
@ -49,15 +61,13 @@ typedef struct SStreamContinueExecInfo {
} SStreamContinueExecInfo; } SStreamContinueExecInfo;
struct STokenBucket { struct STokenBucket {
int32_t capacity; // total capacity int32_t numCapacity; // total capacity, available token per second
int64_t fillTimestamp;// fill timestamp int32_t numOfToken; // total available tokens
int32_t numOfToken; // total available tokens int32_t numRate; // number of token per second
int32_t rate; // number of token per second double bytesCapacity; // available capacity for maximum input size, KiloBytes per Second
}; double bytesRemain; // not consumed bytes per second
double bytesRate; // number of token per second
struct STaskTimer { int64_t fillTimestamp; // fill timestamp
void* hTaskLaunchTimer;
void* dispatchTimer;
}; };
extern SStreamGlobalEnv streamEnv; extern SStreamGlobalEnv streamEnv;
@ -89,7 +99,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
@ -103,7 +113,10 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
SStreamQueue* streamQueueOpen(int64_t cap); SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
@ -112,7 +125,7 @@ void streamQueueProcessFail(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue); void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
STaskId extractStreamTaskKey(const SStreamTask* pTask);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -970,6 +970,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL || pHandle->db == NULL) { if (pHandle == NULL || pHandle->db == NULL) {
stError("failed to acquire state-backend handle");
goto _ERROR; goto _ERROR;
} }

View File

@ -321,7 +321,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
pTask->chkInfo.startTs = 0; // clear the recorded start time pTask->chkInfo.startTs = 0; // clear the recorded start time
if (remain == 0) { // all tasks are ready if (remain == 0) { // all tasks are ready
stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
stInfo( stInfo(

View File

@ -18,10 +18,6 @@
#include "ttimer.h" #include "ttimer.h"
#include "tmisce.h" #include "tmisce.h"
#define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT 5
typedef struct SBlockName { typedef struct SBlockName {
uint32_t hashValue; uint32_t hashValue;
char parTbName[TSDB_TABLE_NAME_LEN]; char parTbName[TSDB_TABLE_NAME_LEN];
@ -425,7 +421,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
int32_t msgId = pTask->execInfo.dispatch; int32_t msgId = pTask->execInfo.dispatch;
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
return; return;
} }
@ -487,26 +483,25 @@ static void doRetryDispatchData(void* param, void* tmrId) {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
} }
} else { } else {
int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
} }
} else { } else {
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
} }
} }
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
STaskTimer* pTmr = pTask->pTimer;
pTask->msgInfo.retryCount++; pTask->msgInfo.retryCount++;
stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
if (pTmr->dispatchTimer != NULL) { if (pTask->msgInfo.pTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTmr->dispatchTimer); taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
} else { } else {
pTmr->dispatchTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
} }
} }
@ -636,7 +631,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} }
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
@ -1043,12 +1038,14 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
pTask->msgInfo.pData = NULL; pTask->msgInfo.pData = NULL;
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
// put data into inputQ of current task is also allowed // put data into inputQ of current task is also allowed
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
} else {
stDebug("s-task:%s dispatch completed, elapsed time:%"PRId64"ms", pTask->id.idStr, el);
} }
// now ready for next data output // now ready for next data output
@ -1110,7 +1107,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t leftRsp = 0; int32_t leftRsp = 0;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt);
leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
ASSERT(leftRsp >= 0); ASSERT(leftRsp >= 0);
@ -1142,7 +1138,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt); stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt);
} }
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d", stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);

View File

@ -192,11 +192,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
int32_t streamScanHistoryData(SStreamTask* pTask) { int32_t streamScanHistoryData(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t size = 0;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
bool finished = false; bool finished = false;
int32_t outputBatchSize = 100;
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
@ -213,6 +211,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
return -1; return -1;
} }
int32_t size = 0;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
while (1) { while (1) {
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
@ -247,9 +246,10 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks, stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD,
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
break; break;
} }
} }
@ -260,8 +260,6 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
size = 0;
} else { } else {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
} }
@ -312,7 +310,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr); pStreamTask->id.idStr);
} }
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); ASSERT(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
@ -363,8 +361,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info // 5. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0; pStreamTask->hTaskInfo.id.taskId = 0;
pStreamTask->historyTaskId.streamId = 0; pStreamTask->hTaskInfo.id.streamId = 0;
// 6. save to disk // 6. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
@ -524,6 +522,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
stDebug("s-task:%s start to extract data block from inputQ", id); stDebug("s-task:%s start to extract data block from inputQ", id);
while (1) { while (1) {
int32_t blockSize = 0;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL; SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
@ -531,7 +530,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break; break;
} }
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks); /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (pInput == NULL) { if (pInput == NULL) {
ASSERT(numOfBlocks == 0); ASSERT(numOfBlocks == 0);
return 0; return 0;
@ -555,9 +554,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// here only handle the data block sink operation // here only handle the data block sink operation
if (type == STREAM_INPUT__DATA_BLOCK) { if (type == STREAM_INPUT__DATA_BLOCK) {
int32_t blockSize = streamQueueItemGetSize(pInput); pTask->execInfo.sink.dataSize += blockSize;
pTask->execInfo.sink.bytes += blockSize;
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
continue; continue;

View File

@ -21,10 +21,6 @@
#include "tstream.h" #include "tstream.h"
#include "ttimer.h" #include "ttimer.h"
#define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0; int32_t streamBackendId = 0;
@ -195,11 +191,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->chkpId = streamGetLatestCheckpointId(pMeta); pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) { while (pMeta->streamBackend == NULL) {
taosMsleep(500); taosMsleep(100);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) { if (pMeta->streamBackend == NULL) {
stError("vgId:%d failed to init stream backend", pMeta->vgId); stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
stInfo("vgId:%d retry to init stream backend", pMeta->vgId);
} }
} }
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
@ -263,11 +258,10 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) { while (pMeta->streamBackend == NULL) {
taosMsleep(500); taosMsleep(100);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) { if (pMeta->streamBackend == NULL) {
stError("vgId:%d failed to init stream backend", pMeta->vgId); stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
stInfo("vgId:%d retry to init stream backend", pMeta->vgId);
} }
} }
@ -548,9 +542,11 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId)); SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId));
if (ppStreamTask != NULL) { if (ppStreamTask != NULL) {
(*ppStreamTask)->historyTaskId.taskId = 0; (*ppStreamTask)->hTaskInfo.id.taskId = 0;
(*ppStreamTask)->historyTaskId.streamId = 0; (*ppStreamTask)->hTaskInfo.id.streamId = 0;
} }
} else {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
@ -697,7 +693,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
STaskId id = extractStreamTaskKey(pTask); STaskId id = streamTaskExtractKey(pTask);
taosArrayPush(pRecycleList, &id); taosArrayPush(pRecycleList, &id);
int32_t total = taosArrayGetSize(pRecycleList); int32_t total = taosArrayGetSize(pRecycleList);
@ -806,7 +802,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
return 0; return 0;
} }
static bool enoughTimeDuration(SMetaHbInfo* pInfo) { static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0; pInfo->tickCounter = 0;
return true; return true;
@ -843,7 +839,7 @@ void metaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs(); pMeta->pHbInfo->hbStart = taosGetTimestampMs();
} }
if (!enoughTimeDuration(pMeta->pHbInfo)) { if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;

View File

@ -20,6 +20,7 @@
#define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) #define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)
#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec
// todo refactor: // todo refactor:
// read data from input queue // read data from input queue
@ -30,7 +31,9 @@ typedef struct SQueueReader {
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader; } SQueueReader;
static bool streamTaskHasAvailableToken(STokenBucket* pBucket); static bool streamTaskExtractAvailableToken(STokenBucket* pBucket);
static void streamTaskPutbackToken(STokenBucket* pBucket);
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
static void streamQueueCleanup(SStreamQueue* pQueue) { static void streamQueueCleanup(SStreamQueue* pQueue) {
void* qItem = NULL; void* qItem = NULL;
@ -147,12 +150,22 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
} }
} }
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize) {
int32_t retryTimes = 0; int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5; int32_t MAX_RETRY_TIMES = 5;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t taskLevel = pTask->info.taskLevel; int32_t taskLevel = pTask->info.taskLevel;
*pInput = NULL;
*numOfBlocks = 0; *numOfBlocks = 0;
*blockSize = 0;
// no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket))) {
stDebug("s-task:%s no available token in bucket for sink data, wait", id);
return TSDB_CODE_SUCCESS;
}
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
@ -166,6 +179,17 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
taosMsleep(10); taosMsleep(10);
continue; continue;
} }
// restore the token to bucket
if (*numOfBlocks > 0) {
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
}
} else {
streamTaskPutbackToken(pTask->pTokenBucket);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -173,17 +197,24 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
int8_t type = qItem->type; int8_t type = qItem->type;
if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE) { type == STREAM_INPUT__TRANS_STATE) {
const char* p = streamQueueItemGetTypeStr(qItem->type); const char* p = streamQueueItemGetTypeStr(type);
if (*pInput == NULL) { if (*pInput == NULL) {
stDebug("s-task:%s %s msg extracted, start to process immediately", id, p); stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
// restore the token to bucket in case of checkpoint/trans-state msg
streamTaskPutbackToken(pTask->pTokenBucket);
*blockSize = 0;
*numOfBlocks = 1; *numOfBlocks = 1;
*pInput = qItem; *pInput = qItem;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block
// previous existed blocks needs to be handle, before handle the checkpoint msg block
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue); streamQueueProcessFail(pTask->inputInfo.queue);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -192,7 +223,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
ASSERT((*numOfBlocks) == 0); ASSERT((*numOfBlocks) == 0);
*pInput = qItem; *pInput = qItem;
} else { } else {
// todo we need to sort the data block, instead of just appending into the array list. // merge current block failed, let's handle the already merged blocks.
void* newRet = streamMergeQueueItem(*pInput, qItem); void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) { if (newRet == NULL) {
if (terrno != 0) { if (terrno != 0) {
@ -200,6 +231,11 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
tstrerror(terrno)); tstrerror(terrno));
} }
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue); streamQueueProcessFail(pTask->inputInfo.queue);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -212,6 +248,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -334,43 +376,68 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate) {
if (cap < 50 || rate < 50 || pBucket == NULL) { if (numCap < 10 || numRate < 10 || pBucket == NULL) {
stError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
pBucket->capacity = cap; pBucket->numCapacity = numCap;
pBucket->rate = rate; pBucket->numOfToken = numCap;
pBucket->numOfToken = cap; pBucket->numRate = numRate;
pBucket->bytesRate = bytesRate;
pBucket->bytesCapacity = bytesRate * MAX_SMOOTH_BURST_RATIO;
pBucket->bytesRemain = pBucket->bytesCapacity;
pBucket->fillTimestamp = taosGetTimestampMs(); pBucket->fillTimestamp = taosGetTimestampMs();
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void fillBucket(STokenBucket* pBucket) { static void fillTokenBucket(STokenBucket* pBucket) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp; int64_t delta = now - pBucket->fillTimestamp;
ASSERT(pBucket->numOfToken >= 0); ASSERT(pBucket->numOfToken >= 0);
int32_t inc = (delta / 1000.0) * pBucket->rate; int32_t incNum = (delta / 1000.0) * pBucket->numRate;
if (inc > 0) { if (incNum > 0) {
if ((pBucket->numOfToken + inc) < pBucket->capacity) { pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
pBucket->numOfToken += inc;
} else {
pBucket->numOfToken = pBucket->capacity;
}
pBucket->fillTimestamp = now; pBucket->fillTimestamp = now;
stDebug("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now); }
// increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->bytesRate;
if (incSize > 0) {
pBucket->bytesRemain = TMIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity);
}
if (incNum > 0) {
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64
" wait for %.2f Sec",
pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0);
} }
} }
bool streamTaskHasAvailableToken(STokenBucket* pBucket) { bool streamTaskExtractAvailableToken(STokenBucket* pBucket) {
fillBucket(pBucket); fillTokenBucket(pBucket);
if (pBucket->numOfToken > 0) { if (pBucket->numOfToken > 0) {
--pBucket->numOfToken; if (pBucket->bytesRemain > 0) {
return true; pBucket->numOfToken -= 1;
return true;
} else { // no available size quota now
return false;
}
} else { } else {
return false; return false;
} }
} }
void streamTaskPutbackToken(STokenBucket* pBucket) {
pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
}
// size in KB
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) {
pBucket->bytesRemain -= SIZE_IN_MiB(bytes);
}

View File

@ -29,9 +29,11 @@ typedef struct STaskRecheckInfo {
void* checkTimer; void* checkTimer;
} STaskRecheckInfo; } STaskRecheckInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
@ -54,7 +56,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
STaskId id = extractStreamTaskKey(pTask); STaskId id = streamTaskExtractKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
@ -90,20 +92,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0; return 0;
} }
const char* streamGetTaskStatusStr(int32_t status) {
switch(status) {
case TASK_STATUS__NORMAL: return "normal";
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused";
case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop";
case TASK_STATUS__UNINIT: return "uninitialized";
default:return "";
}
}
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range; SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -249,7 +237,7 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
} }
destroyRecheckInfo(pInfo); destroyRecheckInfo(pInfo);
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
} }
@ -302,7 +290,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
pTask->status.taskStatus = TASK_STATUS__DROPPING; pTask->status.taskStatus = TASK_STATUS__DROPPING;
ASSERT(pTask->historyTaskId.taskId == 0); ASSERT(pTask->hTaskInfo.id.taskId == 0);
} else { } else {
stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
streamTaskEnablePause(pTask); streamTaskEnablePause(pTask);
@ -375,7 +363,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} else { } else {
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
@ -613,8 +601,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param; SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta; SStreamMeta* pMeta = pInfo->pMeta;
stDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
if (ppTask) { if (ppTask) {
@ -622,10 +608,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
if (streamTaskShouldStop(&(*ppTask)->status)) { if (streamTaskShouldStop(&(*ppTask)->status)) {
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
stDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, pStatus, (*ppTask)->hTaskInfo.retryTimes, ref);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
return; return;
} }
@ -634,78 +622,118 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
if (pTask != NULL) { if (pTask != NULL) {
ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
stWarn(
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
"destroyed, or should stop",
pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer); pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return; return;
} }
if (pHTask != NULL) { if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
checkFillhistoryTaskStatus(pTask, pHTask); int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
streamMetaReleaseTask(pMeta, pHTask); streamMetaReleaseTask(pMeta, pTask);
}
// not in timer anymore stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
atomic_sub_fetch_8(&pTask->status.timerActive, 1); pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref);
streamMetaReleaseTask(pMeta, pTask);
pHTaskInfo->id.taskId = 0;
pHTaskInfo->id.streamId = 0;
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* p = streamGetTaskStatusStr(pTask->status.taskStatus);
int32_t hTaskId = pHTaskInfo->id.taskId;
stDebug(
"s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask);
return;
}
if (pHTask != NULL) {
checkFillhistoryTaskStatus(pTask, pHTask);
streamMetaReleaseTask(pMeta, pHTask);
}
// not in timer anymore
int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pHTaskInfo->retryTimes, ref);
streamMetaReleaseTask(pMeta, pTask);
}
} else { } else {
stError("s-task:0x%x failed to load task, it may have been destroyed", (int32_t) pInfo->id.taskId); stError("s-task:0x%x failed to load task, it may have been destroyed, not launch related fill-history task",
(int32_t)pInfo->id.taskId);
} }
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }
// todo fix the bug: 2. race condition SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pInfo->id.taskId = taskId;
pInfo->id.streamId = streamId;
pInfo->pMeta = pMeta;
return pInfo;
}
// an fill history task needs to be started. // an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId; int32_t hTaskId = pTask->hTaskInfo.id.taskId;
if (hTaskId == 0) { if (hTaskId == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ASSERT(pTask->status.downstreamReady == 1); ASSERT(pTask->status.downstreamReady == 1);
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->historyTaskId.streamId, hTaskId); pTask->hTaskInfo.id.streamId, hTaskId);
// Set the execute conditions, including the query time window and the version range // Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (pHTask == NULL) { if (pHTask == NULL) {
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", pTask->id.idStr, pMeta->vgId,
pMeta->vgId, hTaskId); hTaskId);
SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
pInfo->id.taskId = pTask->id.taskId; if (pInfo == NULL) {
pInfo->id.streamId = pTask->id.streamId; stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", pTask->id.idStr);
pInfo->pMeta = pTask->pMeta; return terrno;
}
if (pTask->pTimer->hTaskLaunchTimer == NULL) { streamTaskInitForLaunchHTask(&pTask->hTaskInfo);
pTask->pTimer->hTaskLaunchTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); if (pTask->hTaskInfo.pTimer == NULL) {
if (pTask->pTimer->hTaskLaunchTimer == NULL) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
// todo failed to create timer pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer);
if (pTask->hTaskInfo.pTimer == NULL) { // todo failed to create timer
atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr,
pTask->status.timerActive);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} else { } else {
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active ASSERT(ref >= 1);
ASSERT(ref == 1);
stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref);
} }
} else { // timer exists } else { // timer exists
ASSERT(pTask->status.timerActive == 1); ASSERT(pTask->status.timerActive >= 1);
stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer); taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer);
} }
// try again in 100ms
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -849,7 +877,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
} }
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->hTaskInfo.id.taskId == 0) {
SDataRange* pRange = &pTask->dataRange; SDataRange* pRange = &pTask->dataRange;
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64

View File

@ -96,8 +96,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1;
int32_t taskId = pTask->historyTaskId.taskId; int32_t taskId = pTask->hTaskInfo.id.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1; if (tEncodeI32(pEncoder, taskId)) return -1;
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
@ -169,9 +169,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1;
if (tDecodeI32(pDecoder, &taskId)) return -1; if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->historyTaskId.taskId = taskId; pTask->hTaskInfo.id.taskId = taskId;
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &taskId)) return -1; if (tDecodeI32(pDecoder, &taskId)) return -1;
@ -312,18 +312,14 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->schedInfo.pTimer = NULL; pTask->schedInfo.pTimer = NULL;
} }
if (pTask->pTimer != NULL) { if (pTask->hTaskInfo.pTimer != NULL) {
if (pTask->pTimer->hTaskLaunchTimer != NULL) { taosTmrStop(pTask->hTaskInfo.pTimer);
taosTmrStop(pTask->pTimer->hTaskLaunchTimer); pTask->hTaskInfo.pTimer = NULL;
pTask->pTimer->hTaskLaunchTimer = NULL; }
}
if (pTask->pTimer->dispatchTimer != NULL) { if (pTask->msgInfo.pTimer != NULL) {
taosTmrStop(pTask->pTimer->dispatchTimer); taosTmrStop(pTask->msgInfo.pTimer);
pTask->pTimer->dispatchTimer = NULL; pTask->msgInfo.pTimer = NULL;
}
taosMemoryFreeClear(pTask->pTimer);
} }
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
@ -425,13 +421,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pTask->pTimer = taosMemoryCalloc(1, sizeof(STaskTimer)); // 2MiB per second for sink task
if (pTask->pTimer == NULL) { // 50 times sink operator per second
stError("s-task:%s failed to prepare the timer, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2);
return TSDB_CODE_OUT_OF_MEMORY;
}
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50);
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr); int code = taosThreadMutexAttrInit(&attr);
@ -687,7 +679,35 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
return code; return code;
} }
STaskId extractStreamTaskKey(const SStreamTask* pTask) { STaskId streamTaskExtractKey(const SStreamTask* pTask) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
return id; return id;
} }
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
pInfo->retryTimes = 0;
}
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
ASSERT(pInfo->tickCount == 0);
pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
pInfo->retryTimes += 1;
}
const char* streamGetTaskStatusStr(int32_t status) {
switch(status) {
case TASK_STATUS__NORMAL: return "normal";
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused";
case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop";
case TASK_STATUS__UNINIT: return "uninitialized";
default:return "";
}
}