Merge pull request #22380 from taosdata/fix/liaohj
fix(stream): use streamId&taskId to identify a stream task
This commit is contained in:
commit
c3d1d47bad
|
@ -2767,6 +2767,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int64_t leftForVer;
|
int64_t leftForVer;
|
||||||
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
} SVDropStreamTaskReq;
|
} SVDropStreamTaskReq;
|
||||||
|
|
||||||
|
@ -2958,6 +2959,7 @@ int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
} SVPauseStreamTaskReq;
|
} SVPauseStreamTaskReq;
|
||||||
|
|
||||||
|
@ -2976,6 +2978,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
|
int64_t streamId;
|
||||||
int8_t igUntreated;
|
int8_t igUntreated;
|
||||||
} SVResumeStreamTaskReq;
|
} SVResumeStreamTaskReq;
|
||||||
|
|
||||||
|
|
|
@ -644,9 +644,9 @@ void streamMetaClose(SStreamMeta* streamMeta);
|
||||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
|
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
|
||||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
|
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
|
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
|
||||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
|
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamMetaBegin(SStreamMeta* pMeta);
|
int32_t streamMetaBegin(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -232,7 +232,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
|
||||||
|
|
||||||
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
|
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
|
||||||
int32_t fillHistory) {
|
int32_t fillHistory) {
|
||||||
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList);
|
int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid;
|
||||||
|
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -335,8 +336,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
|
||||||
(*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
|
(*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
|
||||||
(*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
|
(*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
|
||||||
|
|
||||||
mDebug("s-task:0x%x related history task:0x%x, level:%d", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId,
|
mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
|
||||||
(*pHTask)->info.taskLevel);
|
(*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -649,6 +649,8 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
|
|
||||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||||
pReq->taskId = pTask->id.taskId;
|
pReq->taskId = pTask->id.taskId;
|
||||||
|
pReq->streamId = pTask->id.streamId;
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
|
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
|
@ -1361,6 +1363,8 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
|
||||||
}
|
}
|
||||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||||
pReq->taskId = pTask->id.taskId;
|
pReq->taskId = pTask->id.taskId;
|
||||||
|
pReq->streamId = pTask->id.streamId;
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
|
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
|
@ -1501,7 +1505,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
|
||||||
}
|
}
|
||||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||||
pReq->taskId = pTask->id.taskId;
|
pReq->taskId = pTask->id.taskId;
|
||||||
|
pReq->streamId = pTask->id.streamId;
|
||||||
pReq->igUntreated = igUntreated;
|
pReq->igUntreated = igUntreated;
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
|
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
|
|
|
@ -35,9 +35,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
int32_t taskId = req.taskId;
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
||||||
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {
|
SRpcMsg rsp = {
|
||||||
.info = pMsg->info,
|
.info = pMsg->info,
|
||||||
|
@ -181,21 +179,21 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||||
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
|
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
|
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId);
|
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
SStreamTaskRunReq *pReq = pMsg->pCont;
|
SStreamTaskRunReq *pReq = pMsg->pCont;
|
||||||
int32_t taskId = pReq->taskId;
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
streamProcessRunReq(pTask);
|
streamProcessRunReq(pTask);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
|
@ -213,9 +211,8 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
|
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
|
||||||
tDecodeStreamDispatchReq(&decoder, &req);
|
tDecodeStreamDispatchReq(&decoder, &req);
|
||||||
int32_t taskId = req.taskId;
|
|
||||||
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
|
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp, exec);
|
streamProcessDispatchMsg(pTask, &req, &rsp, exec);
|
||||||
|
@ -235,8 +232,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
tDecoderInit(&decoder, msgBody, msgLen);
|
tDecoderInit(&decoder, msgBody, msgLen);
|
||||||
tDecodeStreamRetrieveReq(&decoder, &req);
|
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
int32_t taskId = req.dstTaskId;
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.dstTaskId);
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
|
||||||
|
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = { .info = pMsg->info, .code = 0};
|
SRpcMsg rsp = { .info = pMsg->info, .code = 0};
|
||||||
|
@ -251,8 +247,11 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t taskId = ntohl(pRsp->upstreamTaskId);
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
int32_t taskId = htonl(pRsp->upstreamTaskId);
|
||||||
|
int64_t streamId = htobe64(pRsp->streamId);
|
||||||
|
|
||||||
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, streamId, taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
|
@ -260,7 +259,6 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
} else {
|
} else {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
|
@ -297,7 +295,7 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg)
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
// find task
|
// find task
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.downstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -340,7 +338,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
.upstreamTaskId = req.upstreamTaskId,
|
.upstreamTaskId = req.upstreamTaskId,
|
||||||
};
|
};
|
||||||
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId);
|
||||||
|
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
rsp.status = streamTaskCheckStatus(pTask);
|
rsp.status = streamTaskCheckStatus(pTask);
|
||||||
|
@ -400,7 +398,7 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
|
||||||
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
||||||
pSnode->pMeta->vgId);
|
pSnode->pMeta->vgId);
|
||||||
|
|
|
@ -1062,7 +1062,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
.upstreamTaskId = req.upstreamTaskId,
|
.upstreamTaskId = req.upstreamTaskId,
|
||||||
};
|
};
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
rsp.status = streamTaskCheckStatus(pTask);
|
rsp.status = streamTaskCheckStatus(pTask);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
@ -1072,8 +1072,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
} else {
|
} else {
|
||||||
rsp.status = 0;
|
rsp.status = 0;
|
||||||
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
||||||
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
") from task:0x%x (vgId:%d), rsp status %d",
|
||||||
|
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
}
|
}
|
||||||
|
|
||||||
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
|
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
|
||||||
|
@ -1099,7 +1100,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
|
||||||
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
||||||
pTq->pStreamMeta->vgId);
|
pTq->pStreamMeta->vgId);
|
||||||
|
@ -1160,7 +1161,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
// not added into meta store
|
// not added into meta store
|
||||||
if (added) {
|
if (added) {
|
||||||
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
|
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
|
||||||
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
|
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, pTask->id.streamId, taskId);
|
||||||
if (p != NULL) { // reset the downstreamReady flag.
|
if (p != NULL) { // reset the downstreamReady flag.
|
||||||
streamTaskCheckDownstreamTasks(p);
|
streamTaskCheckDownstreamTasks(p);
|
||||||
}
|
}
|
||||||
|
@ -1178,7 +1179,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
|
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
|
||||||
pMeta->vgId, pReq->taskId);
|
pMeta->vgId, pReq->taskId);
|
||||||
|
@ -1234,7 +1235,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
bool done = false;
|
bool done = false;
|
||||||
|
|
||||||
// 1. get the related stream task
|
// 1. get the related stream task
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
// todo delete this task, if the related stream task is dropped
|
// todo delete this task, if the related stream task is dropped
|
||||||
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
|
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
|
||||||
|
@ -1242,7 +1243,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||||
|
|
||||||
streamMetaUnregisterTask(pMeta, pTask->id.taskId);
|
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1350,7 +1351,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
|
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
|
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1386,7 +1387,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
|
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
|
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
|
||||||
pTq->pStreamMeta->vgId, req.downstreamTaskId);
|
pTq->pStreamMeta->vgId, req.downstreamTaskId);
|
||||||
|
@ -1412,7 +1413,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tDecodeCompleteHistoryDataMsg(&decoder, &req);
|
tDecodeCompleteHistoryDataMsg(&decoder, &req);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
|
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
|
||||||
pTq->pStreamMeta->vgId, req.upstreamTaskId);
|
pTq->pStreamMeta->vgId, req.upstreamTaskId);
|
||||||
|
@ -1503,7 +1504,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
// even in halt status, the data in inputQ must be processed
|
// even in halt status, the data in inputQ must be processed
|
||||||
int8_t st = pTask->status.taskStatus;
|
int8_t st = pTask->status.taskStatus;
|
||||||
|
@ -1538,7 +1539,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||||
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
||||||
tDecodeStreamDispatchReq(&decoder, &req);
|
tDecodeStreamDispatchReq(&decoder, &req);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp, exec);
|
streamProcessDispatchMsg(pTask, &req, &rsp, exec);
|
||||||
|
@ -1552,10 +1553,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||||
|
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t taskId = ntohl(pRsp->upstreamTaskId);
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
|
||||||
|
|
||||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||||
|
int32_t taskId = htonl(pRsp->upstreamTaskId);
|
||||||
|
int64_t streamId = htobe64(pRsp->streamId);
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, streamId, taskId);
|
||||||
|
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
@ -1569,13 +1572,13 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
|
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
|
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId);
|
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1584,7 +1587,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
|
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
|
||||||
|
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||||
pReq->taskId);
|
pReq->taskId);
|
||||||
|
@ -1597,7 +1600,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
SStreamTask* pHistoryTask = NULL;
|
SStreamTask* pHistoryTask = NULL;
|
||||||
if (pTask->historyTaskId.taskId != 0) {
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask == NULL) {
|
||||||
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
|
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
|
||||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||||
|
@ -1656,13 +1659,13 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
||||||
|
|
||||||
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
||||||
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
|
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
|
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
||||||
if (pHistoryTask) {
|
if (pHistoryTask) {
|
||||||
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
|
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
|
||||||
}
|
}
|
||||||
|
@ -1681,8 +1684,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tDecodeStreamRetrieveReq(&decoder, &req);
|
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
int32_t taskId = req.dstTaskId;
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
|
||||||
|
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
|
@ -1720,7 +1722,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
int32_t taskId = req.taskId;
|
int32_t taskId = req.taskId;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp, false);
|
streamProcessDispatchMsg(pTask, &req, &rsp, false);
|
||||||
|
|
|
@ -72,8 +72,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
int32_t* pTaskId = taosArrayGet(pTaskList, i);
|
SStreamId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -242,8 +242,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
numOfTasks = taosArrayGetSize(pTaskList);
|
numOfTasks = taosArrayGetSize(pTaskList);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
int32_t* pTaskId = taosArrayGet(pTaskList, i);
|
SStreamId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -290,7 +290,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
// todo: destroy the fill-history task here
|
// todo: destroy the fill-history task here
|
||||||
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
||||||
|
@ -350,10 +350,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
streamTaskResumeFromHalt(pStreamTask);
|
streamTaskResumeFromHalt(pStreamTask);
|
||||||
|
|
||||||
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
||||||
int32_t taskId = pTask->id.taskId;
|
|
||||||
|
|
||||||
// 5. free it and remove fill-history task from disk meta-store
|
// 5. free it and remove fill-history task from disk meta-store
|
||||||
streamMetaUnregisterTask(pMeta, taskId);
|
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||||
|
|
||||||
// 6. save to disk
|
// 6. save to disk
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
|
@ -66,14 +66,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
|
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||||
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
|
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
|
||||||
if (pMeta->pTasks == NULL) {
|
if (pMeta->pTasks == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// task list
|
// task list
|
||||||
pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t));
|
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamId));
|
||||||
if (pMeta->pTaskList == NULL) {
|
if (pMeta->pTaskList == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -161,43 +161,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
|
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
|
||||||
if (pTask == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
SDecoder decoder;
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
|
||||||
if (tDecodeStreamTask(&decoder, pTask) < 0) {
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
|
|
||||||
taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
|
|
||||||
ASSERT(0);
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
FAIL:
|
|
||||||
if (pTask) tFreeStreamTask(pTask);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
@ -241,14 +204,15 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
|
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
|
||||||
*pAdded = false;
|
*pAdded = false;
|
||||||
|
|
||||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||||
|
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||||
|
|
||||||
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
@ -263,7 +227,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
|
taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES);
|
||||||
*pAdded = true;
|
*pAdded = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -274,10 +238,11 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
|
||||||
return (int32_t)size;
|
return (int32_t)size;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
|
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
taosRLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
int64_t keys[2] = {streamId, taskId};
|
||||||
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (ppTask != NULL) {
|
if (ppTask != NULL) {
|
||||||
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
||||||
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||||
|
@ -304,22 +269,24 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) {
|
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) {
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
if (*pTaskId == taskId) {
|
if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
|
||||||
taosArrayRemove(pMeta->pTaskList, i);
|
taosArrayRemove(pMeta->pTaskList, i);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
|
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
|
|
||||||
// pre-delete operation
|
// pre-delete operation
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
|
||||||
|
int64_t keys[2] = {streamId, taskId};
|
||||||
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
pTask = *ppTask;
|
pTask = *ppTask;
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
||||||
|
@ -335,7 +302,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
taosRLockLatch(&pMeta->lock);
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
|
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
if ((*ppTask)->status.timerActive == 0) {
|
if ((*ppTask)->status.timerActive == 0) {
|
||||||
|
@ -354,15 +321,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
|
|
||||||
// let's do delete of stream task
|
// let's do delete of stream task
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
taosHashRemove(pMeta->pTasks, keys, sizeof(keys));
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
||||||
|
|
||||||
ASSERT(pTask->status.timerActive == 0);
|
ASSERT(pTask->status.timerActive == 0);
|
||||||
|
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
||||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
|
||||||
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
|
|
||||||
|
|
||||||
// remove the ref by timer
|
// remove the ref by timer
|
||||||
if (pTask->triggerParam != 0) {
|
if (pTask->triggerParam != 0) {
|
||||||
|
@ -473,7 +438,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// do duplicate task check.
|
// do duplicate task check.
|
||||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||||
|
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
|
@ -484,7 +450,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||||
} else {
|
} else {
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
|
@ -493,7 +459,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
|
if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
|
|
|
@ -19,7 +19,8 @@
|
||||||
|
|
||||||
typedef struct SStreamTaskRetryInfo {
|
typedef struct SStreamTaskRetryInfo {
|
||||||
SStreamMeta* pMeta;
|
SStreamMeta* pMeta;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
|
int64_t streamId;
|
||||||
} SStreamTaskRetryInfo;
|
} SStreamTaskRetryInfo;
|
||||||
|
|
||||||
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
||||||
|
@ -540,7 +541,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
|
qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
|
int64_t keys[2] = {pInfo->streamId, pInfo->taskId};
|
||||||
|
|
||||||
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
ASSERT((*ppTask)->status.timerActive == 1);
|
ASSERT((*ppTask)->status.timerActive == 1);
|
||||||
|
|
||||||
|
@ -556,12 +559,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->streamId, pInfo->taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
ASSERT(pTask->status.timerActive == 1);
|
ASSERT(pTask->status.timerActive == 1);
|
||||||
|
|
||||||
// abort the timer if intend to stop task
|
// abort the timer if intend to stop task
|
||||||
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
||||||
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
|
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
|
||||||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||||
qWarn(
|
qWarn(
|
||||||
|
@ -595,14 +598,16 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
int32_t hTaskId = pTask->historyTaskId.taskId;
|
int32_t hTaskId = pTask->historyTaskId.taskId;
|
||||||
|
|
||||||
|
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
|
||||||
// Set the execute conditions, including the query time window and the version range
|
// Set the execute conditions, including the query time window and the version range
|
||||||
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
|
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (pHTask == NULL) {
|
if (pHTask == NULL) {
|
||||||
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
|
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
|
||||||
pMeta->vgId, hTaskId);
|
pMeta->vgId, hTaskId);
|
||||||
|
|
||||||
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
|
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
|
||||||
pInfo->taskId = pTask->id.taskId;
|
pInfo->taskId = pTask->id.taskId;
|
||||||
|
pInfo->streamId = pTask->id.streamId;
|
||||||
pInfo->pMeta = pTask->pMeta;
|
pInfo->pMeta = pTask->pMeta;
|
||||||
|
|
||||||
if (pTask->launchTaskTimer == NULL) {
|
if (pTask->launchTaskTimer == NULL) {
|
||||||
|
@ -797,7 +802,8 @@ void launchFillHistoryTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 1);
|
ASSERT(pTask->status.downstreamReady == 1);
|
||||||
qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);
|
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
|
||||||
|
pTask->historyTaskId.streamId, tId);
|
||||||
|
|
||||||
// launch associated fill history task
|
// launch associated fill history task
|
||||||
streamLaunchFillHistoryTask(pTask);
|
streamLaunchFillHistoryTask(pTask);
|
||||||
|
|
Loading…
Reference in New Issue