Merge pull request #29750 from taosdata/enh/streamqueue
refactor(stream): add more operation handled in ctrl queue, and limit the stream input q size
This commit is contained in:
commit
d6bd52ece7
|
@ -1016,15 +1016,15 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -1158,51 +1158,22 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||
bool ready = true;
|
||||
int32_t code = 0;
|
||||
if (mndStreamNodeIsUpdated(pMnode)) {
|
||||
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
|
||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||
}
|
||||
|
||||
streamMutexLock(&execInfo.lock);
|
||||
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
|
||||
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
||||
if (taosArrayGetSize(execInfo.pTaskList) != 0) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
mError("stream task node change checking done, no vgroups exist, but task list is not empty");
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->status != TASK_STATUS__READY) {
|
||||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
|
||||
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
||||
ready = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (pEntry->hTaskId != 0) {
|
||||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
|
||||
" exists, checkpoint not issued",
|
||||
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
|
||||
pEntry->hTaskId);
|
||||
ready = false;
|
||||
break;
|
||||
code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return ready ? 0 : -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
|
||||
|
@ -1216,7 +1187,22 @@ int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
|
||||
// -1 denote not ready now or never ready till now
|
||||
if (pEntry->hTaskId != 0) {
|
||||
mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
|
||||
" exists, checkpoint not issued",
|
||||
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
|
||||
pEntry->hTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pEntry->status != TASK_STATUS__READY) {
|
||||
mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
|
||||
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ts < pEntry->startTime) {
|
||||
ts = pEntry->startTime;
|
||||
taskId = pEntry->id.taskId;
|
||||
}
|
||||
|
@ -1249,11 +1235,11 @@ static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
|
|||
|
||||
int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
|
||||
if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
|
||||
|
||||
if (lastReadyTs != -1) {
|
||||
mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold",
|
||||
pStream->uid, lastReadyTs, now - lastReadyTs);
|
||||
} else {
|
||||
mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid);
|
||||
mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
|
||||
"ms less than threshold",
|
||||
pStream->uid, lastReadyTs, (now - lastReadyTs));
|
||||
}
|
||||
|
||||
ready = false;
|
||||
|
@ -1274,7 +1260,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
|||
int32_t numOfCheckpointTrans = 0;
|
||||
|
||||
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
|
||||
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
|
||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||
}
|
||||
|
||||
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
|
||||
|
@ -1326,7 +1312,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
int32_t numOfQual = taosArrayGetSize(pList);
|
||||
if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
|
||||
if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
|
||||
mDebug(
|
||||
"%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
|
||||
"checkpoint trans are not allowed, wait for 30s",
|
||||
|
@ -2601,20 +2587,51 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
|
||||
int32_t alreadySend = taosArrayGetSize(pList);
|
||||
|
||||
for (int32_t i = 0; i < alreadySend; ++i) {
|
||||
int32_t *taskId = taosArrayGet(pList, i);
|
||||
if (taskId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
|
||||
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
|
||||
if ((pe != NULL) && (pe->req.taskId == *taskId)) {
|
||||
taosArrayRemove(pInfo->pTaskList, k);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return alreadySend;
|
||||
}
|
||||
|
||||
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
bool allReady = true;
|
||||
SArray *pNodeSnapshot = NULL;
|
||||
int32_t maxAllowedTrans = 50;
|
||||
int32_t numOfTrans = 0;
|
||||
int32_t code = 0;
|
||||
void *pIter = NULL;
|
||||
|
||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pList == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
|
||||
if (pStreamList == NULL) {
|
||||
taosArrayDestroy(pList);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
mDebug("start to process consensus-checkpointId in tmr");
|
||||
|
||||
bool allReady = true;
|
||||
SArray *pNodeSnapshot = NULL;
|
||||
|
||||
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
|
||||
code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
if (code) {
|
||||
mError("failed to get the vgroup snapshot, ignore it and continue");
|
||||
|
@ -2623,28 +2640,30 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
if (!allReady) {
|
||||
mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
|
||||
taosArrayDestroy(pStreamList);
|
||||
taosArrayDestroy(pList);
|
||||
return 0;
|
||||
}
|
||||
|
||||
streamMutexLock(&execInfo.lock);
|
||||
|
||||
void *pIter = NULL;
|
||||
while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
|
||||
SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
|
||||
|
||||
int64_t streamId = -1;
|
||||
int32_t num = taosArrayGetSize(pInfo->pTaskList);
|
||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pList == NULL) {
|
||||
continue;
|
||||
}
|
||||
taosArrayClear(pList);
|
||||
|
||||
int64_t streamId = -1;
|
||||
int32_t num = taosArrayGetSize(pInfo->pTaskList);
|
||||
SStreamObj *pStream = NULL;
|
||||
|
||||
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
|
||||
if (pStream == NULL || code != 0) { // stream has been dropped already
|
||||
mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
|
||||
void *p = taosArrayPush(pStreamList, &pInfo->streamId);
|
||||
taosArrayDestroy(pList);
|
||||
if (p == NULL) {
|
||||
mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
|
||||
" code:%s, continue",
|
||||
pInfo->streamId, tstrerror(terrno));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2654,7 +2673,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
continue;
|
||||
}
|
||||
|
||||
streamId = pe->req.streamId;
|
||||
if (streamId == -1) {
|
||||
streamId = pe->req.streamId;
|
||||
}
|
||||
|
||||
int32_t existed = 0;
|
||||
bool allSame = true;
|
||||
|
@ -2665,7 +2686,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
break;
|
||||
}
|
||||
|
||||
if (((now - pe->ts) >= 10 * 1000) || allSame) {
|
||||
if (((now - pe->ts) >= 10 * 1000) && allSame) {
|
||||
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
|
||||
pe->req.startTs, (now - pe->ts) / 1000.0);
|
||||
if (chkId > pe->req.checkpointId) {
|
||||
|
@ -2673,8 +2694,12 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
taosArrayDestroy(pStreamList);
|
||||
mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
|
||||
pe->req.checkpointId, chkId);
|
||||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
|
||||
|
@ -2684,7 +2709,6 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
if (p == NULL) {
|
||||
mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
|
||||
}
|
||||
streamId = pe->req.streamId;
|
||||
} else {
|
||||
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
|
||||
pe->req.startTs, (now - pe->ts) / 1000.0);
|
||||
|
@ -2693,38 +2717,27 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
|
||||
if (taosArrayGetSize(pList) > 0) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||
int32_t *taskId = taosArrayGet(pList, i);
|
||||
if (taskId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
|
||||
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
|
||||
if ((pe != NULL) && (pe->req.taskId == *taskId)) {
|
||||
taosArrayRemove(pInfo->pTaskList, k);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pList);
|
||||
int32_t alreadySend = doCleanReqList(pList, pInfo);
|
||||
|
||||
// clear request stream item with empty task list
|
||||
if (taosArrayGetSize(pInfo->pTaskList) == 0) {
|
||||
mndClearConsensusRspEntry(pInfo);
|
||||
if (streamId == -1) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
taosArrayDestroy(pStreamList);
|
||||
mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
|
||||
return TSDB_CODE_FAILED;
|
||||
mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId);
|
||||
}
|
||||
|
||||
void *p = taosArrayPush(pStreamList, &streamId);
|
||||
if (p == NULL) {
|
||||
mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
|
||||
mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
|
||||
}
|
||||
}
|
||||
|
||||
numOfTrans += alreadySend;
|
||||
if (numOfTrans > maxAllowedTrans) {
|
||||
mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
|
||||
taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
|
||||
|
@ -2739,7 +2752,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
streamMutexUnlock(&execInfo.lock);
|
||||
|
||||
taosArrayDestroy(pStreamList);
|
||||
mDebug("end to process consensus-checkpointId in tmr");
|
||||
taosArrayDestroy(pList);
|
||||
|
||||
mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -814,17 +814,18 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
|||
|
||||
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
|
||||
int64_t ts) {
|
||||
char msg[128] = {0};
|
||||
char msg[128] = {0};
|
||||
STrans *pTrans = NULL;
|
||||
SStreamTask *pTask = NULL;
|
||||
|
||||
snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
|
||||
|
||||
STrans *pTrans = NULL;
|
||||
int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
|
||||
if (pTrans == NULL || code != 0) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STaskId id = {.streamId = pStream->uid, .taskId = taskId};
|
||||
SStreamTask *pTask = NULL;
|
||||
STaskId id = {.streamId = pStream->uid, .taskId = taskId};
|
||||
code = mndGetStreamTask(&id, pStream, &pTask);
|
||||
if (code) {
|
||||
mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
|
||||
|
|
|
@ -947,20 +947,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
|
||||
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER:
|
||||
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_GET_STREAM_PROGRESS:
|
||||
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
|
||||
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
@ -987,6 +975,18 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn
|
|||
return tqProcessTaskCheckReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
||||
return tqProcessTaskCheckRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
|
||||
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER:
|
||||
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
|
||||
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -38,7 +38,7 @@ extern "C" {
|
|||
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
||||
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
|
||||
#define STREAM_TASK_QUEUE_CAPACITY 5120
|
||||
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
|
||||
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (10)
|
||||
|
||||
// clang-format off
|
||||
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||
|
|
|
@ -131,12 +131,12 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
|||
|
||||
code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(buf);
|
||||
return code;
|
||||
stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s",
|
||||
pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code));
|
||||
} else {
|
||||
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
|
||||
}
|
||||
|
||||
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -807,6 +807,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
|
||||
if (ret == EXEC_AFTER_IDLE) {
|
||||
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
|
||||
|
@ -841,8 +843,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
continue;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
// here only handle the data block sink operation
|
||||
if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||
pTask->execInfo.sink.dataSize += blockSize;
|
||||
|
@ -873,6 +873,13 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore
|
||||
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
|
||||
streamTaskSetIdleInfo(pTask, 500);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,7 +190,7 @@ system sh/exec.sh -n dnode1 -s start
|
|||
sql insert into t1 values(1648791223004,5,2,3,1.1);
|
||||
|
||||
loop4:
|
||||
sleep 1000
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql select * from streamt;
|
||||
|
||||
|
|
Loading…
Reference in New Issue