diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 48bf2451a0..929a330e8a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -179,7 +179,7 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); #endif typedef struct { - STaosQueue* queue; + STaosQueue* pQueue; STaosQall* qall; void* qItem; int8_t status; @@ -297,6 +297,7 @@ typedef struct SSTaskBasicInfo { int32_t totalLevel; int8_t taskLevel; int8_t fillHistory; // is fill history task or not + int64_t triggerParam; // in msec } SSTaskBasicInfo; typedef struct SDispatchMsgInfo { @@ -306,12 +307,23 @@ typedef struct SDispatchMsgInfo { int64_t blockingTs; // output blocking timestamp } SDispatchMsgInfo; -typedef struct { +typedef struct STaskOutputInfo { int8_t type; int8_t status; SStreamQueue* queue; } STaskOutputInfo; +typedef struct STaskInputInfo { + int8_t status; + SStreamQueue* queue; +} STaskInputInfo; + +typedef struct STaskSchedInfo { + int8_t status; +// int64_t triggerParam; + void* pTimer; +} STaskSchedInfo; + typedef struct { int64_t init; int64_t step1Start; @@ -323,6 +335,8 @@ struct SStreamTask { SStreamTaskId id; SSTaskBasicInfo info; STaskOutputInfo outputInfo; + STaskInputInfo inputInfo; + STaskSchedInfo schedInfo; SDispatchMsgInfo msgInfo; SStreamStatus status; SCheckpointInfo chkInfo; @@ -330,8 +344,6 @@ struct SStreamTask { SHistDataRange dataRange; SStreamTaskId historyTaskId; SStreamTaskId streamTaskId; - int32_t nextCheckId; - SArray* checkpointInfo; // SArray STaskTimestamp tsInfo; SArray* pReadyMsgList; // SArray TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ @@ -346,13 +358,6 @@ struct SStreamTask { STaskSinkFetch fetchSink; }; - int8_t inputStatus; - SStreamQueue* inputQueue; - - // trigger - int8_t triggerStatus; - int64_t triggerParam; - void* schedTimer; void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend @@ -596,14 +601,6 @@ typedef struct SStreamTaskNodeUpdateMsg { int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); -typedef struct SStreamTaskNodeUpdateRsp { - int64_t streamId; - int32_t taskId; -} SStreamTaskNodeUpdateRsp; - -int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg); -int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg); - typedef struct { int64_t streamId; int32_t downstreamTaskId; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 06ef01a3f1..c1a59416f6 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -99,7 +99,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->triggerParam); + pTask->info.fillHistory, pTask->info.triggerParam); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 65e6ee4433..5d47baad64 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -915,7 +915,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->triggerParam); + pTask->info.fillHistory, pTask->info.triggerParam); return 0; } @@ -1504,7 +1504,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); - } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { + } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputInfo.queue->pQueue) == 0)) { tqScanWalAsync(pTq, false); } else { streamSchedExec(pTask); @@ -1832,44 +1832,3 @@ _end: return rsp.code; } -int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) { - int32_t vgId = TD_VID(pTq->pVnode); - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; - - SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - - SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - if (pTask == NULL) { - tqError("vgId:%d process stop req, failed to acquire task:0x%x, it may have been dropped already", vgId, - pReq->taskId); - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active - return TSDB_CODE_SUCCESS; - } - - tqDebug("s-task:%s receive stop msg from mnode", pTask->id.idStr); - streamTaskStop(pTask); - - SStreamTask* pHistoryTask = NULL; - if (pTask->historyTaskId.taskId != 0) { - pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); - if (pHistoryTask == NULL) { - tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", - pMeta->vgId, pTask->historyTaskId.taskId); - streamMetaReleaseTask(pMeta, pTask); - - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active - return TSDB_CODE_SUCCESS; - } - - tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - - streamTaskStop(pHistoryTask); - streamMetaReleaseTask(pMeta, pHistoryTask); - } - - streamMetaReleaseTask(pMeta, pTask); - tmsgSendRsp(&rsp); - return 0; -} diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 37763a690d..5efccc8f3c 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -340,14 +340,14 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (streamQueueIsFull(pTask->inputQueue->queue)) { + if (streamQueueIsFull(pTask->inputInfo.queue->pQueue)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; } // downstream task has blocked the output, stopped for a while - if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index a485e94824..bb81582a2d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -77,9 +77,6 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -extern int32_t streamBackendId; -extern int32_t streamBackendCfWrapperId; - #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 3b045a8ad7..1f93498557 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -59,8 +59,8 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { static void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; - int8_t status = atomic_load_8(&pTask->triggerStatus); - qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam); + int8_t status = atomic_load_8(&pTask->schedInfo.status); + qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->info.triggerParam); if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr); @@ -80,29 +80,29 @@ static void streamSchedByTimer(void* param, void* tmrId) { return; } - atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); + atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); pTrigger->pBlock->info.type = STREAM_GET_ALL; if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) { taosFreeQitem(pTrigger); - taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); return; } streamSchedExec(pTask); } - taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); } int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) { + if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - ASSERT(ref == 2 && pTask->schedTimer == NULL); + ASSERT(ref == 2 && pTask->schedInfo.pTimer == NULL); - qDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->triggerParam); + qDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - pTask->schedTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); - pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE; + pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer); + pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } return 0; @@ -224,7 +224,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock destroyStreamDataBlock(pBlock); } else { ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); - code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); if (code != 0) { qError("s-task:%s failed to put res into outputQ", pTask->id.idStr); } @@ -299,7 +299,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); } void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 54f8b9b697..2a0940c4d0 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -155,7 +155,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream pBlock->srcTaskId = pTask->id.taskId; pBlock->srcVgId = pTask->pMeta->vgId; - int32_t code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + int32_t code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); if (code == 0) { streamDispatchStreamBlock(pTask); } else { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 64628f8c7f..cf04bcc1b8 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -493,7 +493,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); const char* id = pTask->id.idStr; - int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->queue); + int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->pQueue); if (numOfElems > 0) { qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems); } @@ -995,7 +995,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // the input queue of the (down stream) task that receive the output data is full, // so the TASK_INPUT_STATUS_BLOCKED is rsp if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream + pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); @@ -1012,7 +1012,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i pTask->msgInfo.blockingTs = 0; // put data into inputQ of current task is also allowed - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; } // now ready for next data output @@ -1062,19 +1062,3 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* tEndDecode(pDecoder); return 0; } - -int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} \ No newline at end of file diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a3cbc1fb4e..e6b112d050 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -83,7 +83,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return 0; } - if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); taosMsleep(1000); continue; @@ -192,7 +192,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { return 0; } - if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { qDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); taosMsleep(10000); continue; @@ -249,8 +249,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { } int32_t streamTaskGetInputQItems(const SStreamTask* pTask) { - int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue); - int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall); + int32_t numOfItems1 = taosQueueItemSize(pTask->inputInfo.queue->pQueue); + int32_t numOfItems2 = taosQallItemSize(pTask->inputInfo.queue->qall); return numOfItems1 + numOfItems2; } @@ -360,7 +360,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 7. pause allowed. streamTaskEnablePause(pStreamTask); - if (taosQueueEmpty(pStreamTask->inputQueue->queue)) { + if (taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); @@ -472,7 +472,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock // agg task should dispatch trans-state msg to sink task, to flush all data to sink task. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { pBlock->srcVgId = pTask->pMeta->vgId; - code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); if (code == 0) { streamDispatchStreamBlock(pTask); } else { @@ -615,7 +615,7 @@ int32_t streamTryExec(SStreamTask* pTask) { qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) || + if (!(taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 471804b6d6..ff5e9adaee 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -261,10 +261,10 @@ void streamMetaClear(SStreamMeta* pMeta) { SStreamTask* p = *(SStreamTask**)pIter; // release the ref by timer - if (p->triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer + if (p->info.triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer qDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); - taosTmrStop(p->schedTimer); - p->triggerParam = 0; + taosTmrStop(p->schedInfo.pTimer); + p->info.triggerParam = 0; streamMetaReleaseTask(pMeta, p); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 519551486b..7e6a438e12 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -19,7 +19,6 @@ #define MIN_STREAM_EXEC_BATCH_NUM 4 #define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) -#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) // todo refactor: // read data from input queue @@ -46,28 +45,28 @@ SStreamQueue* streamQueueOpen(int64_t cap) { return NULL; } - pQueue->queue = taosOpenQueue(); + pQueue->pQueue = taosOpenQueue(); pQueue->qall = taosAllocateQall(); - if (pQueue->queue == NULL || pQueue->qall == NULL) { - if (pQueue->queue) taosCloseQueue(pQueue->queue); + if (pQueue->pQueue == NULL || pQueue->qall == NULL) { + if (pQueue->pQueue) taosCloseQueue(pQueue->pQueue); if (pQueue->qall) taosFreeQall(pQueue->qall); taosMemoryFree(pQueue); return NULL; } pQueue->status = STREAM_QUEUE__SUCESS; - taosSetQueueCapacity(pQueue->queue, cap); - taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024); + taosSetQueueCapacity(pQueue->pQueue, cap); + taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024); return pQueue; } void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { - qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); + qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue, taosQueueItemSize(pQueue->pQueue)); streamQueueCleanup(pQueue); taosFreeQall(pQueue->qall); - taosCloseQueue(pQueue->queue); + taosCloseQueue(pQueue->pQueue); taosMemoryFree(pQueue); } @@ -81,7 +80,7 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { pQueue->qItem = NULL; taosGetQitem(pQueue->qall, &pQueue->qItem); if (pQueue->qItem == NULL) { - taosReadAllQitems(pQueue->queue, pQueue->qall); + taosReadAllQitems(pQueue->pQueue, pQueue->qall); taosGetQitem(pQueue->qall, &pQueue->qItem); } @@ -149,7 +148,7 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { bool streamQueueIsFull(const STaosQueue* pQueue) { bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY; - double size = QUEUE_MEM_SIZE_IN_MB((STaosQueue*) pQueue); + double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*) pQueue)); return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE); } @@ -165,7 +164,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); return TSDB_CODE_SUCCESS; @@ -185,7 +184,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { taosMsleep(10); @@ -211,7 +210,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block qDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); - streamQueueProcessFail(pTask->inputQueue); + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } } else { @@ -227,7 +226,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu tstrerror(terrno)); } - streamQueueProcessFail(pTask->inputQueue); + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } @@ -235,7 +234,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputQueue); + streamQueueProcessSuccess(pTask->inputInfo.queue); if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); @@ -246,13 +245,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { - int8_t type = pItem->type; - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + int8_t type = pItem->type; + STaosQueue* pQueue = pTask->inputInfo.queue->pQueue; + int32_t total = taosQueueItemSize(pQueue) + 1; + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputQueue->queue)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) { qError( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); @@ -264,7 +264,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int32_t msgLen = px->submit.msgLen; int64_t ver = px->submit.ver; - int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { streamDataSubmitDestroy(px); taosFreeQitem(pItem); @@ -276,7 +276,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) msgLen, ver, total, size + SIZE_IN_MB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (streamQueueIsFull(pTask->inputQueue->queue)) { + if (streamQueueIsFull(pQueue)) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); @@ -284,27 +284,27 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) } qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); - int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock((SStreamDataBlock*)pItem); return code; } } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { - taosWriteQitem(pTask->inputQueue->queue, pItem); + taosWriteQitem(pQueue, pItem); qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. - taosWriteQitem(pTask->inputQueue->queue, pItem); + taosWriteQitem(pQueue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } else { ASSERT(0); } - if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { - atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); - qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus); + if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->info.triggerParam != 0) { + atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); + qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status); } return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 01dcb435c0..bc719699d1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -39,7 +39,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.streamId = streamId; pTask->info.taskLevel = taskLevel; pTask->info.fillHistory = fillHistory; - pTask->triggerParam = triggerParam; + pTask->info.triggerParam = triggerParam; char buf[128] = {0}; sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); @@ -47,7 +47,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; addToTaskset(pTaskList, pTask); @@ -133,7 +133,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } - if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -244,7 +244,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } - if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; tEndDecode(pDecoder); return 0; @@ -271,9 +271,9 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMsleep(10); } - if (pTask->schedTimer != NULL) { - taosTmrStop(pTask->schedTimer); - pTask->schedTimer = NULL; + if (pTask->schedInfo.pTimer != NULL) { + taosTmrStop(pTask->schedInfo.pTimer); + pTask->schedInfo.pTimer = NULL; } if (pTask->launchTaskTimer != NULL) { @@ -282,8 +282,8 @@ void tFreeStreamTask(SStreamTask* pTask) { } int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); - if (pTask->inputQueue) { - streamQueueClose(pTask->inputQueue, pTask->id.taskId); + if (pTask->inputInfo.queue) { + streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId); } if (pTask->outputInfo.queue) { @@ -353,16 +353,16 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.timerActive = 0; - pTask->inputQueue = streamQueueOpen(512 << 10); + pTask->inputInfo.queue = streamQueueOpen(512 << 10); pTask->outputInfo.queue = streamQueueOpen(512 << 10); - if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) { + if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) { qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); return -1; } pTask->tsInfo.init = taosGetTimestampMs(); - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta;