fix(stream): fix error found by CI.

This commit is contained in:
Haojun Liao 2024-05-28 17:30:14 +08:00
parent c3c6b680fb
commit 8d54d45054
4 changed files with 19 additions and 14 deletions

View File

@ -1088,10 +1088,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
// if (pTq->pStreamMeta->vgId == 2) {
// ASSERT(0);
// }
// disable auto rsp to mnode
pRsp->info.handle = NULL;

View File

@ -864,7 +864,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
}
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg;
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId);
if (pTask == NULL) {

View File

@ -562,11 +562,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
stDebug("s-task:%s vgId:%d checkpoint-trigger monit start, ts:%" PRId64, pTask->id.idStr, vgId, now);
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor start, ts:%" PRId64, pTask->id.idStr, vgId, now);
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
if (pState->state != TASK_STATUS__CK) {
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger", pTask->id.idStr, vgId);
taosThreadMutexUnlock(&pTask->lock);
return;
@ -599,12 +599,14 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
}
// do send retrieve checkpoint trigger msg to upstream
int32_t size = taosArrayGetSize(pNotSendList);
doSendRetrieveTriggerMsg(pTask, pNotSendList);
taosThreadMutexUnlock(&pActiveInfo->lock);
// check every 100ms
if (taosArrayGetSize(pNotSendList) > 0) {
if (size > 0) {
taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActiveInfo->pCheckTmr);
stDebug("s-task:%s start monitor trigger in 10sec", pTask->id.idStr);
}
taosArrayDestroy(pNotSendList);
@ -614,8 +616,11 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
int32_t code = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* pId = pTask->id.idStr;
int32_t size = taosArrayGetSize(pNotSendList);
for (int32_t i = 0; i < taosArrayGetSize(pNotSendList); i++) {
stDebug("s-task:%s start to send trigger-retrieve msg to %d upstream(s)", pId, size);
for (int32_t i = 0; i < size; i++) {
SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
@ -633,10 +638,13 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
pReq->upstreamNodeId = pUpstreamTask->nodeId;
pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
SRpcMsg rpcMsg = {0};
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, pReq, sizeof(SRetrieveChkptTriggerReq));
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
stDebug("s-task:%s vgId:%d send retrieve msg to 0x%x checkpointId:%" PRId64, pId, vgId, pUpstreamTask->taskId,
pReq->checkpointId);
}
return TSDB_CODE_SUCCESS;

View File

@ -595,8 +595,9 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return 0;
}
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
pBlock->type == STREAM_INPUT__TRANS_STATE);
int32_t type = pBlock->type;
ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE);
pTask->execInfo.dispatch += 1;
pTask->msgInfo.startTs = taosGetTimestampMs();
@ -607,7 +608,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} else { // todo handle build dispatch msg failed
}
if (pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskInitTriggerDispatchInfo(pTask);
}
@ -829,7 +830,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
}
} else {
taosArrayPush(pActiveInfo->pReadyMsgList, &info);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size + 1);
}
taosThreadMutexUnlock(&pActiveInfo->lock);