fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-08-05 14:45:54 +08:00
parent 1ba3139f9f
commit ade444b690
8 changed files with 180 additions and 29 deletions

View File

@ -133,6 +133,9 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo == NULL) {
continue;
}
setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
@ -370,6 +373,10 @@ void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
if (p == NULL) {
continue;
}
if (p->nodeId == nodeId) {
existed = true;
break;
@ -412,6 +419,10 @@ void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatus
*pStatusInfo = NULL;
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
if (p == NULL) {
continue;
}
if (p->taskId == taskId) {
*pStatusInfo = p;
}
@ -546,6 +557,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo == NULL) {
continue;
}
if (p->taskId == pVgInfo->taskId) {
setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
@ -566,6 +580,10 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p == NULL) {
continue;
}
if (p->status == TASK_DOWNSTREAM_READY) {
(*numOfReady) += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
@ -603,8 +621,12 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
pInfo->timeoutStartTs = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
int32_t* px = taosArrayGet(pTimeoutList, i);
if (px == NULL) {
continue;
}
int32_t taskId = *px;
SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, taskId, &p);
if (p != NULL) {
@ -620,9 +642,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
pInfo->timeoutRetryCount = 0;
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
int32_t* pTaskId = taosArrayGet(pTimeoutList, i);
if (pTaskId == NULL) {
continue;
}
SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, taskId, &p);
findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) {
addIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
@ -647,10 +673,13 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
// reset the info, and send the check msg to failure downstream again
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
if (pTaskId == NULL) {
continue;
}
SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, taskId, &p);
findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) {
p->rspTs = 0;
p->status = -1;

View File

@ -37,7 +37,7 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t
for (int32_t i = 0; i < blockNum; i++) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
if (pDataBlock == NULL) {
if (pDataBlock == NULL || pRetrieve == NULL) {
return terrno;
}

View File

@ -268,6 +268,10 @@ static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStre
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo == NULL) {
continue;
}
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReqs, numOfVgroups);
@ -300,6 +304,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
SSDataBlock* p = taosArrayGet(pData->blocks, 0);
if (p == NULL) {
return terrno;
}
pTask->msgInfo.checkpointId = p->info.version;
pTask->msgInfo.transId = p->info.window.ekey;
}
@ -313,6 +321,11 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
if (pDataBlock == NULL) {
destroyDispatchMsg(pReqs, 1);
return terrno;
}
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs);
if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReqs, 1);
@ -328,6 +341,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
if (pDataBlock == NULL) {
destroyDispatchMsg(pReqs, numOfVgroups);
return terrno;
}
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT ||
@ -342,6 +359,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
// it's a new vnode to receive dispatch msg, so add one
if (pReqs[j].blockNum == 0) {
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
if (pDstVgroupInfo == NULL) {
destroyDispatchMsg(pReqs, numOfVgroups);
return terrno;
}
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true);
}
@ -399,6 +420,11 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
for (int32_t i = 0; i < numOfVgroups; i++) {
if (pDispatchMsg[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo == NULL) {
code = terrno;
break;
}
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", id, pTask->info.selfChildId,
pDispatchMsg[i].blockNum, pVgInfo->vgId);
@ -457,6 +483,10 @@ static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int
setResendInfo(pEntry, now);
for (int32_t j = 0; j < numOfVgroups; ++j) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo == NULL) {
continue;
}
if (pVgInfo->vgId == pEntry->nodeId) {
int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s",
@ -521,6 +551,10 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
int32_t numOfRetry = 0;
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
if (pEntry == NULL) {
continue;
}
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
continue;
}
@ -553,14 +587,17 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
ASSERT(taosArrayGetSize(pTask->msgInfo.pSendInfo) == 1);
int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo);
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
if (pEntry != NULL) {
setResendInfo(pEntry, now);
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
} else {
stError("s-task:%s invalid index 0, size:%d", id, s);
}
}
}
@ -637,6 +674,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
for (int32_t j = 0; j < numOfVgroups; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo == NULL) {
continue;
}
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
@ -646,8 +686,10 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
if (pReqs[j].blockNum == 0) {
SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
if (pDstVgroupInfo != NULL) {
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
}
}
pReqs[j].blockNum++;
found = true;
@ -832,6 +874,10 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
for (int32_t i = 0; i < num; ++i) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
if (pInfo == NULL) {
continue;
}
if (pInfo->sendCompleted == 1) {
continue;
}
@ -846,11 +892,18 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
int32_t notRsp = taosArrayGetSize(pNotRspList);
if (notRsp > 0) { // send checkpoint-ready msg again
for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotRspList, i);
int32_t* pTaskId = taosArrayGet(pNotRspList, i);
if (pTaskId == NULL) {
continue;
}
for (int32_t j = 0; j < num; ++j) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j);
if (taskId == pReadyInfo->upstreamTaskId) { // send msg again
if (pReadyInfo == NULL) {
continue;
}
if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again
SRpcMsg msg = {0};
int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, pReadyInfo->childId,
@ -902,6 +955,9 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
for (int32_t i = 0; i < num; ++i) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
if (pInfo == NULL) {
continue;
}
SRpcMsg msg = {0};
int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId,
@ -945,11 +1001,14 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
streamMutexLock(&pTask->chkInfo.pActiveInfo->lock);
if (taosArrayGetSize(pList) == 1) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0);
if (pInfo != NULL) {
tmsgSendRsp(&pInfo->msg);
taosArrayClear(pList);
stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr,
pTask->info.taskLevel);
} else {
// todo
}
} else {
stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr,
pTask->info.taskLevel);
@ -1097,6 +1156,10 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
ASSERT(size == 1);
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0);
if (pReady == NULL) {
return terrno;
}
if (pReady->transId == pReq->transId) {
stWarn("s-task:%s repeatly recv checkpoint source msg from mnode, checkpointId:%" PRId64 ", ignore",
pTask->id.idStr, pReq->checkpointId);
@ -1104,7 +1167,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
stError("s-task:%s checkpointId:%" PRId64 " transId:%d not completed, new transId:%d checkpointId:%" PRId64
" recv from mnode",
pTask->id.idStr, pReady->checkpointId, pReady->transId, pReq->transId, pReq->checkpointId);
ASSERT(0); // failed to handle it
}
} else {
(void) taosArrayPush(pActiveInfo->pReadyMsgList, &info);
@ -1168,8 +1230,10 @@ void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
for (int i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); i++) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (pInfo != NULL) {
rpcFreeCont(pInfo->msg.pCont);
}
}
taosArrayClear(pActiveInfo->pReadyMsgList);
}
@ -1215,6 +1279,10 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t
for(int32_t i = 0; i < numOfDispatchBranch; ++i) {
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i);
if (pEntry == NULL) {
continue;
}
if (pEntry->rspTs != -1) {
numOfRsp += 1;
}
@ -1222,6 +1290,10 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t
for (int32_t j = 0; j < numOfDispatchBranch; ++j) {
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
if (pEntry == NULL) {
continue;
}
if (pEntry->nodeId == vgId) {
ASSERT(!alreadySet);
pEntry->rspTs = now;
@ -1254,6 +1326,10 @@ int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) {
for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
if (pEntry == NULL) {
continue;
}
if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) {
numOfFailed += 1;
}

View File

@ -87,6 +87,10 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
for (int32_t i = 0; i < size; ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
if (pInfo == NULL) {
return terrno;
}
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1;
@ -228,10 +232,14 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
int32_t* pLen = taosArrayGet(pReq->dataLen, i);
void* data = taosArrayGetP(pReq->data, i);
if (tEncodeI32(pEncoder, len) < 0) return -1;
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
if (data == NULL || pLen == NULL) {
return terrno;
}
if (tEncodeI32(pEncoder, *pLen) < 0) return -1;
if (tEncodeBinary(pEncoder, data, *pLen) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
@ -341,6 +349,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
if (ps == NULL) {
return terrno;
}
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
@ -378,6 +390,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
for (int j = 0; j < numOfVgs; ++j) {
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
if (pVgId == NULL) {
return terrno;
}
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
}

View File

@ -64,8 +64,12 @@ int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_
int64_t getSessionWindowEndkey(void* data, int32_t index) {
SArray* pWinInfos = (SArray*)data;
SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
if (ppos != NULL) {
SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
return pWin->win.ekey;
} else {
return 0;
}
}
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {

View File

@ -399,6 +399,10 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
// unite read/write snap file
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i);
if (pItem == NULL) {
continue;
}
if (pItem->ref == 0) {
taosMemoryFree(pItem->name);
}

View File

@ -555,6 +555,9 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo == NULL) {
continue;
}
if (pVgInfo->vgId == nodeId) {
bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
@ -636,6 +639,10 @@ bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
bool updated = false;
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
if (pInfo == NULL) {
continue;
}
int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
if (code) {
stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
@ -1013,6 +1020,10 @@ SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
if (pVgInfo == NULL) {
continue;
}
if (pVgInfo->taskId == taskId) {
return &pVgInfo->epSet;
}

View File

@ -164,6 +164,10 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
for (int32_t i = 0; i < numOfTrans; ++i) {
STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i);
if (pTrans == NULL) {
continue;
}
if (pTrans->state.state == state && pTrans->event == event) {
return pTrans;
}
@ -187,6 +191,9 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1);
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
if (pEvtInfo == NULL) {
return terrno;
}
// OK, let's handle the waiting event, since the task has reached the required status now
if (pSM->current.state == pEvtInfo->status) {
@ -227,6 +234,10 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
for (int32_t i = 0; i < num; ++i) {
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
if (pInfo == NULL) {
continue;
}
if (pInfo->event == event) {
taosArrayRemove(pSM->pWaitingEventList, i);
stDebug("s-task:%s %s event in waiting list not be handled yet, remove it from waiting list, remaining events:%d",