fix(stream): desc the ref for checkpoint-trigger timer.

This commit is contained in:
Haojun Liao 2024-05-29 14:12:14 +08:00
parent 09efbad117
commit 27ac6ca16e
4 changed files with 24 additions and 9 deletions

View File

@ -681,6 +681,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);
// common
void streamTaskPause(SStreamTask* pTask);

View File

@ -198,8 +198,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
SActiveCheckpointInfo* pActive = pTask->chkInfo.pActiveInfo;
streamMetaAcquireOneTask(pTask);
if (pActive->pCheckTmr == NULL) {
streamMetaAcquireOneTask(pTask);
pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer);
} else {
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActive->pCheckTmr);
@ -605,6 +606,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
}
if (++pActiveInfo->checkCounter < 100) {
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr);
return;
}
@ -614,7 +616,10 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger", pTask->id.idStr, vgId);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", pTask->id.idStr,
vgId, ref);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
@ -769,7 +774,7 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
taosThreadMutexLock(&pInfo->lock);
// outputQ should be empty here
ASSERT(streamQueueGetNumOfItems(pTask->outputq.queue) == 1);
ASSERT(streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) == 0);
pInfo->dispatchTrigger = true;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {

View File

@ -566,9 +566,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
if (numOfElems > 0) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id,
numOfElems, size);
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
int32_t numOfUnAccessed = streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue);
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d/%d, size:%.2fMiB", id,
numOfUnAccessed, numOfElems, size);
}
// to make sure only one dispatch is running
@ -889,7 +890,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64
":0x%x (vgId:%d) idx:%d, vgId:%d",
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index,
req.upstreamNodeId);

View File

@ -117,6 +117,13 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
return numOfItems1 + numOfItems2;
}
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
return numOfItems1 + numOfItems2;
}
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
}
@ -322,9 +329,10 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
ASSERT(0);
}
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->info.triggerParam != 0) {
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
(pTask->info.triggerParam != 0)) {
atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
stDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status);
stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status);
}
return 0;