diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a98ad5a4c2..61d74b5809 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -270,13 +270,13 @@ typedef struct SCheckpointInfo { } SCheckpointInfo; typedef struct SStreamStatus { - int8_t taskStatus; - int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set - int8_t schedStatus; - int8_t keepTaskStatus; - bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it - int8_t timerActive; // timer is active - int8_t pauseAllowed; // allowed task status to be set to be paused + int8_t taskStatus; + int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set + int8_t schedStatus; + int8_t keepTaskStatus; + bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it + int8_t pauseAllowed; // allowed task status to be set to be paused + int32_t timerActive; // timer is active } SStreamStatus; typedef struct SDataRange { @@ -304,6 +304,7 @@ typedef struct SDispatchMsgInfo { int32_t retryCount; // retry send data count int64_t startTs; // dispatch start time, record total elapsed time for dispatch SArray* pRetryList; // current dispatch successfully completed node of downstream + void* pTimer; // used to dispatch data after a given time duration } SDispatchMsgInfo; typedef struct STaskOutputInfo { @@ -326,23 +327,33 @@ typedef struct SSinkRecorder { int64_t numOfSubmit; int64_t numOfBlocks; int64_t numOfRows; - int64_t bytes; + int64_t dataSize; } SSinkRecorder; typedef struct STaskExecStatisInfo { int64_t created; int64_t init; + int64_t start; int64_t step1Start; int64_t step2Start; - int64_t start; int32_t updateCount; - int32_t dispatch; int64_t latestUpdateTs; + int32_t processDataBlocks; + int64_t processDataSize; + int32_t dispatch; + int64_t dispatchDataSize; int32_t checkpoint; SSinkRecorder sink; } STaskExecStatisInfo; -typedef struct STaskTimer STaskTimer; +typedef struct SHistoryTaskInfo { + STaskId id; + void* pTimer; + int32_t tickCount; + int32_t retryTimes; + int32_t waitInterval; +} SHistoryTaskInfo; + typedef struct STokenBucket STokenBucket; typedef struct SMetaHbInfo SMetaHbInfo; @@ -358,7 +369,7 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SDataRange dataRange; - STaskId historyTaskId; + SHistoryTaskInfo hTaskInfo; STaskId streamTaskId; STaskExecStatisInfo execInfo; SArray* pReadyMsgList; // SArray @@ -375,7 +386,6 @@ struct SStreamTask { }; STokenBucket* pTokenBucket; - STaskTimer* pTimer; SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend SArray* pRspMsgList; @@ -415,7 +425,6 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; int64_t stage; -// bool leader; int32_t role; STaskStartInfo startInfo; SRWLatch lock; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 1d7d391acf..62d5ff47e3 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -296,8 +296,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { SStreamTask** pStreamTask = taosArrayGet(pTaskList, i); SStreamTask** pHTask = taosArrayGet(pHTaskList, i); - (*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId; - (*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId; + (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId; + (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId; (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index dada1695ef..dabb01afe4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -876,7 +876,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam); + pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); } return 0; @@ -1227,7 +1227,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } else { STimeWindow* pWindow = &pTask->dataRange.window; - if (pTask->historyTaskId.taskId == 0) { + if (pTask->hTaskInfo.id.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " @@ -1441,12 +1441,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg streamTaskPause(pTask, pMeta); SStreamTask* pHistoryTask = NULL; - if (pTask->historyTaskId.taskId != 0) { - pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); + if (pTask->hTaskInfo.id.taskId != 0) { + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); if (pHistoryTask == NULL) { tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64 ", it may have been dropped already", - pMeta->vgId, pTask->historyTaskId.taskId); + pMeta->vgId, pTask->hTaskInfo.id.taskId); streamMetaReleaseTask(pMeta, pTask); // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active @@ -1515,7 +1515,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } SStreamTask* pHistoryTask = - streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); + streamMetaAcquireTask(pTq->pStreamMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); if (pHistoryTask) { code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); } @@ -1810,8 +1810,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamSetStatusNormal(pTask); SStreamTask** ppHTask = NULL; - if (pTask->historyTaskId.taskId != 0) { - ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); + if (pTask->hTaskInfo.id.taskId != 0) { + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); if (ppHTask == NULL || *ppHTask == NULL) { tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a5958197bd..8009eccb1b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -272,11 +272,12 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* SSinkRecorder* pRec = &pTask->execInfo.sink; pRec->numOfSubmit += 1; - if ((pRec->numOfSubmit % 5000) == 0) { + if ((pRec->numOfSubmit % 1000) == 0) { double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, %.2fMiB duration:%.2f Sec.", - pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el); + pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), + el); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index dbe868b54f..1d8fde3a48 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,7 +26,19 @@ extern "C" { #endif -#define CHECK_DOWNSTREAM_INTERVAL 100 +#define CHECK_DOWNSTREAM_INTERVAL 100 +#define LAUNCH_HTASK_INTERVAL 100 +#define WAIT_FOR_MINIMAL_INTERVAL 100.00 +#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 +#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 + +#define MAX_BLOCK_NAME_NUM 1024 +#define DISPATCH_RETRY_INTERVAL_MS 300 +#define MAX_CONTINUE_RETRY_COUNT 5 + +#define META_HB_CHECK_INTERVAL 200 +#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec +#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) // clang-format off #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) @@ -49,15 +61,13 @@ typedef struct SStreamContinueExecInfo { } SStreamContinueExecInfo; struct STokenBucket { - int32_t capacity; // total capacity - int64_t fillTimestamp;// fill timestamp - int32_t numOfToken; // total available tokens - int32_t rate; // number of token per second -}; - -struct STaskTimer { - void* hTaskLaunchTimer; - void* dispatchTimer; + int32_t numCapacity; // total capacity, available token per second + int32_t numOfToken; // total available tokens + int32_t numRate; // number of token per second + double bytesCapacity; // available capacity for maximum input size, KiloBytes per Second + double bytesRemain; // not consumed bytes per second + double bytesRate; // number of token per second + int64_t fillTimestamp; // fill timestamp }; extern SStreamGlobalEnv streamEnv; @@ -89,7 +99,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); @@ -103,7 +113,10 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate); +STaskId streamTaskExtractKey(const SStreamTask* pTask); +void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); +void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); @@ -112,7 +125,7 @@ void streamQueueProcessFail(SStreamQueue* queue); void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); -STaskId extractStreamTaskKey(const SStreamTask* pTask); + #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 892930c32b..a419a59996 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -970,6 +970,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); if (pHandle == NULL || pHandle->db == NULL) { + stError("failed to acquire state-backend handle"); goto _ERROR; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 997fecbba9..a87901eb47 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -321,7 +321,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { pTask->chkInfo.startTs = 0; // clear the recorded start time if (remain == 0) { // all tasks are ready - stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); + stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); stInfo( diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bd5753cac3..2d701d6bb0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -18,10 +18,6 @@ #include "ttimer.h" #include "tmisce.h" -#define MAX_BLOCK_NAME_NUM 1024 -#define DISPATCH_RETRY_INTERVAL_MS 300 -#define MAX_CONTINUE_RETRY_COUNT 5 - typedef struct SBlockName { uint32_t hashValue; char parTbName[TSDB_TABLE_NAME_LEN]; @@ -425,7 +421,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { int32_t msgId = pTask->execInfo.dispatch; if (streamTaskShouldStop(&pTask->status)) { - int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); return; } @@ -487,26 +483,25 @@ static void doRetryDispatchData(void* param, void* tmrId) { streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } } else { - int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); } } else { - int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); } } void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { - STaskTimer* pTmr = pTask->pTimer; pTask->msgInfo.retryCount++; stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); - if (pTmr->dispatchTimer != NULL) { - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTmr->dispatchTimer); + if (pTask->msgInfo.pTimer != NULL) { + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); } else { - pTmr->dispatchTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); } } @@ -636,7 +631,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); @@ -1043,12 +1038,14 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId pTask->msgInfo.pData = NULL; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; - stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", - pTask->id.idStr, downstreamId, el); // put data into inputQ of current task is also allowed if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", + pTask->id.idStr, downstreamId, el); + } else { + stDebug("s-task:%s dispatch completed, elapsed time:%"PRId64"ms", pTask->id.idStr, el); } // now ready for next data output @@ -1110,7 +1107,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t leftRsp = 0; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt); leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); ASSERT(leftRsp >= 0); @@ -1142,7 +1138,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt); } - int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a87bb00972..b5ea82d347 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -192,11 +192,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i int32_t streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - int32_t size = 0; int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; bool finished = false; - int32_t outputBatchSize = 100; qSetStreamOpOpen(exec); @@ -213,6 +211,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { return -1; } + int32_t size = 0; int32_t numOfBlocks = 0; while (1) { if (streamTaskShouldStop(&pTask->status)) { @@ -247,9 +246,10 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); - if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { - stDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks, - outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); + if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { + stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached", + pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD, + SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD)); break; } } @@ -260,8 +260,6 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { return code; } - - size = 0; } else { taosArrayDestroy(pRes); } @@ -312,7 +310,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); + ASSERT(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -363,8 +361,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. clear the link between fill-history task and stream task info - pStreamTask->historyTaskId.taskId = 0; - pStreamTask->historyTaskId.streamId = 0; + pStreamTask->hTaskInfo.id.taskId = 0; + pStreamTask->hTaskInfo.id.streamId = 0; // 6. save to disk taosWLockLatch(&pMeta->lock); @@ -524,6 +522,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { stDebug("s-task:%s start to extract data block from inputQ", id); while (1) { + int32_t blockSize = 0; int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; if (streamTaskShouldStop(&pTask->status)) { @@ -531,7 +530,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } - /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks); + /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (pInput == NULL) { ASSERT(numOfBlocks == 0); return 0; @@ -555,9 +554,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { - int32_t blockSize = streamQueueItemGetSize(pInput); - pTask->execInfo.sink.bytes += blockSize; - + pTask->execInfo.sink.dataSize += blockSize; stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); continue; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e98a292d37..45c2743ecb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -21,10 +21,6 @@ #include "tstream.h" #include "ttimer.h" -#define META_HB_CHECK_INTERVAL 200 -#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec -#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) - static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; @@ -195,11 +191,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->chkpId = streamGetLatestCheckpointId(pMeta); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { - taosMsleep(500); + taosMsleep(100); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { - stError("vgId:%d failed to init stream backend", pMeta->vgId); - stInfo("vgId:%d retry to init stream backend", pMeta->vgId); + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); @@ -263,11 +258,10 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { - taosMsleep(500); + taosMsleep(100); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { - stError("vgId:%d failed to init stream backend", pMeta->vgId); - stInfo("vgId:%d retry to init stream backend", pMeta->vgId); + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); } } @@ -548,9 +542,11 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId)); if (ppStreamTask != NULL) { - (*ppStreamTask)->historyTaskId.taskId = 0; - (*ppStreamTask)->historyTaskId.streamId = 0; + (*ppStreamTask)->hTaskInfo.id.taskId = 0; + (*ppStreamTask)->hTaskInfo.id.streamId = 0; } + } else { + atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); @@ -697,7 +693,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask); - STaskId id = extractStreamTaskKey(pTask); + STaskId id = streamTaskExtractKey(pTask); taosArrayPush(pRecycleList, &id); int32_t total = taosArrayGetSize(pRecycleList); @@ -806,7 +802,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { return 0; } -static bool enoughTimeDuration(SMetaHbInfo* pInfo) { +static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter pInfo->tickCounter = 0; return true; @@ -843,7 +839,7 @@ void metaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbStart = taosGetTimestampMs(); } - if (!enoughTimeDuration(pMeta->pHbInfo)) { + if (!waitForEnoughDuration(pMeta->pHbInfo)) { taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index abf10487de..65400386b1 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -20,6 +20,7 @@ #define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) +#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec // todo refactor: // read data from input queue @@ -30,7 +31,9 @@ typedef struct SQueueReader { int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms } SQueueReader; -static bool streamTaskHasAvailableToken(STokenBucket* pBucket); +static bool streamTaskExtractAvailableToken(STokenBucket* pBucket); +static void streamTaskPutbackToken(STokenBucket* pBucket); +static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); static void streamQueueCleanup(SStreamQueue* pQueue) { void* qItem = NULL; @@ -147,12 +150,22 @@ const char* streamQueueItemGetTypeStr(int32_t type) { } } -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, + int32_t* blockSize) { int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr; int32_t taskLevel = pTask->info.taskLevel; + + *pInput = NULL; *numOfBlocks = 0; + *blockSize = 0; + + // no available token in bucket for sink task, let's wait for a little bit + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket))) { + stDebug("s-task:%s no available token in bucket for sink data, wait", id); + return TSDB_CODE_SUCCESS; + } while (1) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { @@ -166,6 +179,17 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu taosMsleep(10); continue; } + + // restore the token to bucket + if (*numOfBlocks > 0) { + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + } else { + streamTaskPutbackToken(pTask->pTokenBucket); + } + return TSDB_CODE_SUCCESS; } @@ -173,17 +197,24 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int8_t type = qItem->type; if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { - const char* p = streamQueueItemGetTypeStr(qItem->type); + const char* p = streamQueueItemGetTypeStr(type); if (*pInput == NULL) { stDebug("s-task:%s %s msg extracted, start to process immediately", id, p); + // restore the token to bucket in case of checkpoint/trans-state msg + streamTaskPutbackToken(pTask->pTokenBucket); + *blockSize = 0; *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS; - } else { - // previous existed blocks needs to be handle, before handle the checkpoint msg block + } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } @@ -192,7 +223,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu ASSERT((*numOfBlocks) == 0); *pInput = qItem; } else { - // todo we need to sort the data block, instead of just appending into the array list. + // merge current block failed, let's handle the already merged blocks. void* newRet = streamMergeQueueItem(*pInput, qItem); if (newRet == NULL) { if (terrno != 0) { @@ -200,6 +231,11 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu tstrerror(terrno)); } + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } @@ -212,6 +248,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + return TSDB_CODE_SUCCESS; } } @@ -334,43 +376,68 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc return TSDB_CODE_SUCCESS; } -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { - if (cap < 50 || rate < 50 || pBucket == NULL) { - stError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate) { + if (numCap < 10 || numRate < 10 || pBucket == NULL) { + stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); return TSDB_CODE_INVALID_PARA; } - pBucket->capacity = cap; - pBucket->rate = rate; - pBucket->numOfToken = cap; + pBucket->numCapacity = numCap; + pBucket->numOfToken = numCap; + pBucket->numRate = numRate; + + pBucket->bytesRate = bytesRate; + pBucket->bytesCapacity = bytesRate * MAX_SMOOTH_BURST_RATIO; + pBucket->bytesRemain = pBucket->bytesCapacity; + pBucket->fillTimestamp = taosGetTimestampMs(); return TSDB_CODE_SUCCESS; } -static void fillBucket(STokenBucket* pBucket) { +static void fillTokenBucket(STokenBucket* pBucket) { int64_t now = taosGetTimestampMs(); int64_t delta = now - pBucket->fillTimestamp; ASSERT(pBucket->numOfToken >= 0); - int32_t inc = (delta / 1000.0) * pBucket->rate; - if (inc > 0) { - if ((pBucket->numOfToken + inc) < pBucket->capacity) { - pBucket->numOfToken += inc; - } else { - pBucket->numOfToken = pBucket->capacity; - } - + int32_t incNum = (delta / 1000.0) * pBucket->numRate; + if (incNum > 0) { + pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity); pBucket->fillTimestamp = now; - stDebug("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now); + } + + // increase the new available quota as time goes on + double incSize = (delta / 1000.0) * pBucket->bytesRate; + if (incSize > 0) { + pBucket->bytesRemain = TMIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); + } + + if (incNum > 0) { + stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 + " wait for %.2f Sec", + pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0); } } -bool streamTaskHasAvailableToken(STokenBucket* pBucket) { - fillBucket(pBucket); +bool streamTaskExtractAvailableToken(STokenBucket* pBucket) { + fillTokenBucket(pBucket); + if (pBucket->numOfToken > 0) { - --pBucket->numOfToken; - return true; + if (pBucket->bytesRemain > 0) { + pBucket->numOfToken -= 1; + return true; + } else { // no available size quota now + return false; + } } else { return false; } +} + +void streamTaskPutbackToken(STokenBucket* pBucket) { + pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity); +} + +// size in KB +void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { + pBucket->bytesRemain -= SIZE_IN_MiB(bytes); } \ No newline at end of file diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3ca81ea90b..03a5233023 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -29,9 +29,11 @@ typedef struct STaskRecheckInfo { void* checkTimer; } STaskRecheckInfo; -static int32_t streamSetParamForScanHistory(SStreamTask* pTask); -static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); -static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); +static int32_t streamSetParamForScanHistory(SStreamTask* pTask); +static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); +static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); +static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +static void tryLaunchHistoryTask(void* param, void* tmrId); static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { SStreamMeta* pMeta = pTask->pMeta; @@ -54,7 +56,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { taosWLockLatch(&pMeta->lock); - STaskId id = extractStreamTaskKey(pTask); + STaskId id = streamTaskExtractKey(pTask); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); @@ -90,20 +92,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } -const char* streamGetTaskStatusStr(int32_t status) { - switch(status) { - case TASK_STATUS__NORMAL: return "normal"; - case TASK_STATUS__SCAN_HISTORY: return "scan-history"; - case TASK_STATUS__HALT: return "halt"; - case TASK_STATUS__PAUSE: return "paused"; - case TASK_STATUS__CK: return "check-point"; - case TASK_STATUS__DROPPING: return "dropping"; - case TASK_STATUS__STOP: return "stop"; - case TASK_STATUS__UNINIT: return "uninitialized"; - default:return ""; - } -} - static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; if (pTask->info.fillHistory) { @@ -249,7 +237,7 @@ static void recheckDownstreamTasks(void* param, void* tmrId) { } destroyRecheckInfo(pInfo); - int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); } @@ -302,7 +290,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { if (pTask->info.fillHistory == 1) { stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); pTask->status.taskStatus = TASK_STATUS__DROPPING; - ASSERT(pTask->historyTaskId.taskId == 0); + ASSERT(pTask->hTaskInfo.id.taskId == 0); } else { stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); streamTaskEnablePause(pTask); @@ -375,7 +363,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } else { STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); - int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); @@ -613,8 +601,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; - stDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId); - taosWLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); if (ppTask) { @@ -622,10 +608,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { if (streamTaskShouldStop(&(*ppTask)->status)) { const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); - stDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus); + + int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); + stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", + (*ppTask)->id.idStr, pStatus, (*ppTask)->hTaskInfo.retryTimes, ref); taosMemoryFree(pInfo); - atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1); taosWUnLockLatch(&pMeta->lock); return; } @@ -634,78 +622,118 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { - ASSERT(pTask->status.timerActive >= 1); - // abort the timer if intend to stop task - SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); - if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - stWarn( - "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been " - "destroyed, or should stop", - pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId); + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; - taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer); + pHTaskInfo->tickCount -= 1; + if (pHTaskInfo->tickCount > 0) { + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask); return; } - if (pHTask != NULL) { - checkFillhistoryTaskStatus(pTask, pHTask); - streamMetaReleaseTask(pMeta, pHTask); - } + if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + streamMetaReleaseTask(pMeta, pTask); - // not in timer anymore - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - streamMetaReleaseTask(pMeta, pTask); + stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", + pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); + + pHTaskInfo->id.taskId = 0; + pHTaskInfo->id.streamId = 0; + } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. + streamTaskSetRetryInfoForLaunch(pHTaskInfo); + ASSERT(pTask->status.timerActive >= 1); + + // abort the timer if intend to stop task + SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId); + if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { + const char* p = streamGetTaskStatusStr(pTask->status.taskStatus); + int32_t hTaskId = pHTaskInfo->id.taskId; + stDebug( + "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", + pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); + + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); + streamMetaReleaseTask(pMeta, pTask); + return; + } + + if (pHTask != NULL) { + checkFillhistoryTaskStatus(pTask, pHTask); + streamMetaReleaseTask(pMeta, pHTask); + } + + // not in timer anymore + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, + pHTaskInfo->retryTimes, ref); + streamMetaReleaseTask(pMeta, pTask); + } } else { - stError("s-task:0x%x failed to load task, it may have been destroyed", (int32_t) pInfo->id.taskId); + stError("s-task:0x%x failed to load task, it may have been destroyed, not launch related fill-history task", + (int32_t)pInfo->id.taskId); } taosMemoryFree(pInfo); } -// todo fix the bug: 2. race condition +SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); + if (pInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pInfo->id.taskId = taskId; + pInfo->id.streamId = streamId; + pInfo->pMeta = pMeta; + return pInfo; +} + // an fill history task needs to be started. int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; - int32_t hTaskId = pTask->historyTaskId.taskId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; if (hTaskId == 0) { return TSDB_CODE_SUCCESS; } ASSERT(pTask->status.downstreamReady == 1); stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->historyTaskId.streamId, hTaskId); + pTask->hTaskInfo.id.streamId, hTaskId); // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); if (pHTask == NULL) { - stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, - pMeta->vgId, hTaskId); + stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", pTask->id.idStr, pMeta->vgId, + hTaskId); - SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); - pInfo->id.taskId = pTask->id.taskId; - pInfo->id.streamId = pTask->id.streamId; - pInfo->pMeta = pTask->pMeta; + SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); + if (pInfo == NULL) { + stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", pTask->id.idStr); + return terrno; + } - if (pTask->pTimer->hTaskLaunchTimer == NULL) { - pTask->pTimer->hTaskLaunchTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); - if (pTask->pTimer->hTaskLaunchTimer == NULL) { - // todo failed to create timer + streamTaskInitForLaunchHTask(&pTask->hTaskInfo); + if (pTask->hTaskInfo.pTimer == NULL) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer); + if (pTask->hTaskInfo.pTimer == NULL) { // todo failed to create timer + atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr, + pTask->status.timerActive); taosMemoryFree(pInfo); } else { - int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active - ASSERT(ref == 1); + ASSERT(ref >= 1); stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref); } } else { // timer exists - ASSERT(pTask->status.timerActive == 1); + ASSERT(pTask->status.timerActive >= 1); stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer); + taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer); } - // try again in 100ms return TSDB_CODE_SUCCESS; } @@ -849,7 +877,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory } void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { - if (pTask->historyTaskId.taskId == 0) { + if (pTask->hTaskInfo.id.taskId == 0) { SDataRange* pRange = &pTask->dataRange; if (pTask->info.fillHistory == 1) { stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e5088e9c69..e77ab16040 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -96,8 +96,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; - int32_t taskId = pTask->historyTaskId.taskId; + if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1; + int32_t taskId = pTask->hTaskInfo.id.taskId; if (tEncodeI32(pEncoder, taskId)) return -1; if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; @@ -169,9 +169,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; + if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1; if (tDecodeI32(pDecoder, &taskId)) return -1; - pTask->historyTaskId.taskId = taskId; + pTask->hTaskInfo.id.taskId = taskId; if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; if (tDecodeI32(pDecoder, &taskId)) return -1; @@ -312,18 +312,14 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->schedInfo.pTimer = NULL; } - if (pTask->pTimer != NULL) { - if (pTask->pTimer->hTaskLaunchTimer != NULL) { - taosTmrStop(pTask->pTimer->hTaskLaunchTimer); - pTask->pTimer->hTaskLaunchTimer = NULL; - } + if (pTask->hTaskInfo.pTimer != NULL) { + taosTmrStop(pTask->hTaskInfo.pTimer); + pTask->hTaskInfo.pTimer = NULL; + } - if (pTask->pTimer->dispatchTimer != NULL) { - taosTmrStop(pTask->pTimer->dispatchTimer); - pTask->pTimer->dispatchTimer = NULL; - } - - taosMemoryFreeClear(pTask->pTimer); + if (pTask->msgInfo.pTimer != NULL) { + taosTmrStop(pTask->msgInfo.pTimer); + pTask->msgInfo.pTimer = NULL; } int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); @@ -425,13 +421,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return TSDB_CODE_OUT_OF_MEMORY; } - pTask->pTimer = taosMemoryCalloc(1, sizeof(STaskTimer)); - if (pTask->pTimer == NULL) { - stError("s-task:%s failed to prepare the timer, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return TSDB_CODE_OUT_OF_MEMORY; - } - - streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50); + // 2MiB per second for sink task + // 50 times sink operator per second + streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); @@ -687,7 +679,35 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } -STaskId extractStreamTaskKey(const SStreamTask* pTask) { +STaskId streamTaskExtractKey(const SStreamTask* pTask) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; return id; +} + +void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) { + pInfo->waitInterval = LAUNCH_HTASK_INTERVAL; + pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL); + pInfo->retryTimes = 0; +} + +void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) { + ASSERT(pInfo->tickCount == 0); + + pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE; + pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL); + pInfo->retryTimes += 1; +} + +const char* streamGetTaskStatusStr(int32_t status) { + switch(status) { + case TASK_STATUS__NORMAL: return "normal"; + case TASK_STATUS__SCAN_HISTORY: return "scan-history"; + case TASK_STATUS__HALT: return "halt"; + case TASK_STATUS__PAUSE: return "paused"; + case TASK_STATUS__CK: return "check-point"; + case TASK_STATUS__DROPPING: return "dropping"; + case TASK_STATUS__STOP: return "stop"; + case TASK_STATUS__UNINIT: return "uninitialized"; + default:return ""; + } } \ No newline at end of file