Merge branch 'enh/triggerCheckPoint2' into enh/chkpTransfer
This commit is contained in:
commit
6e6eb05d5c
|
@ -639,7 +639,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
|
||||||
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
|
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
|
||||||
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
|
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
|
||||||
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir);
|
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask);
|
||||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||||
int32_t streamTaskStop(SStreamTask* pTask);
|
int32_t streamTaskStop(SStreamTask* pTask);
|
||||||
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
||||||
|
|
|
@ -203,6 +203,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -88,8 +88,6 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq);
|
||||||
|
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/
|
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
|
||||||
|
@ -776,16 +774,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
|
// pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
|
||||||
if (pDb->cfg.replications != 1) {
|
// if (pDb->cfg.replications != 1) {
|
||||||
mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
|
// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
|
||||||
terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
|
// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
|
||||||
mndReleaseDb(pMnode, pDb);
|
// mndReleaseDb(pMnode, pDb);
|
||||||
pDb = NULL;
|
// pDb = NULL;
|
||||||
goto _OVER;
|
// goto _OVER;
|
||||||
}
|
// }
|
||||||
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
// mndReleaseDb(pMnode, pDb);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
@ -868,7 +866,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)};
|
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)};
|
||||||
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -217,7 +217,7 @@ void tqCleanUp();
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode);
|
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||||
void tqNotifyClose(STQ*);
|
void tqNotifyClose(STQ*);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
int tqPushMsg(STQ*, tmsg_t msgType);
|
||||||
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
||||||
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
||||||
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
|
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
|
@ -680,7 +681,6 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1137,14 +1137,13 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||||
code = tDecodeStreamTask(&decoder, pTask);
|
code = tDecodeStreamTask(&decoder, pTask);
|
||||||
if (code < 0) {
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
if (code < 0) {
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
|
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
// 2.save task, use the latest commit version as the initial start version of stream task.
|
// 2.save task, use the latest commit version as the initial start version of stream task.
|
||||||
|
@ -1166,16 +1165,22 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
// added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
|
// added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
|
||||||
// it is added into the meta store
|
// it is added into the meta store
|
||||||
if (added) {
|
if (added) {
|
||||||
|
// only handled in the leader node
|
||||||
|
if (vnodeIsRoleLeader(pTq->pVnode)) {
|
||||||
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
|
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
|
||||||
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
|
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
|
||||||
|
|
||||||
bool restored = pTq->pVnode->restored;
|
bool restored = pTq->pVnode->restored;
|
||||||
if (p != NULL && restored) { // reset the downstreamReady flag.
|
if (p != NULL && restored) {
|
||||||
streamTaskCheckDownstreamTasks(p);
|
streamTaskCheckDownstreamTasks(p);
|
||||||
} else if (!restored) {
|
} else if (!restored) {
|
||||||
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
|
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (p != NULL) {
|
||||||
streamMetaReleaseTask(pStreamMeta, p);
|
streamMetaReleaseTask(pStreamMeta, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
|
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
@ -1663,6 +1668,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo refactor.
|
||||||
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
STQ* pTq = pVnode->pTq;
|
STQ* pTq = pVnode->pTq;
|
||||||
SMsgHead* msgStr = pMsg->pCont;
|
SMsgHead* msgStr = pMsg->pCont;
|
||||||
|
@ -1830,26 +1836,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
int32_t code = 0;
|
bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task
|
||||||
|
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||||
|
|
||||||
SStreamTaskNodeUpdateMsg req = {0};
|
SStreamTaskNodeUpdateMsg req = {0};
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
|
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
|
||||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||||
tDecoderClear(&decoder);
|
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
|
||||||
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(code));
|
goto _end;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
req.taskId);
|
req.taskId);
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||||
return TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
||||||
|
@ -1858,27 +1864,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTask* pHistoryTask = NULL;
|
SStreamTask* pHistoryTask = NULL;
|
||||||
if (pTask->historyTaskId.taskId != 0) {
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask != NULL) {
|
||||||
tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped",
|
|
||||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
|
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
|
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
|
||||||
streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList);
|
||||||
}
|
|
||||||
|
|
||||||
if (pHistoryTask != NULL) {
|
streamTaskRestart(pHistoryTask, NULL, startTask);
|
||||||
streamTaskRestart(pHistoryTask, NULL);
|
|
||||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||||
|
} else {
|
||||||
|
tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped",
|
||||||
|
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskRestart(pTask, NULL);
|
streamTaskRestart(pTask, NULL, startTask);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
int32_t code = rsp.code;
|
||||||
|
|
||||||
|
_end:
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
tqProcessSubmitReqForSubscribe(pTq);
|
tqProcessSubmitReqForSubscribe(pTq);
|
||||||
}
|
}
|
||||||
|
@ -39,21 +39,15 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||||
taosRUnLockLatch(&pTq->pStreamMeta->lock);
|
taosRUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
||||||
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
|
tqDebug("handle submit, restore:%d, numOfTasks:%d", pTq->pVnode->restored, numOfTasks);
|
||||||
|
|
||||||
// push data for stream processing:
|
// push data for stream processing:
|
||||||
// 1. the vnode has already been restored.
|
// 1. the vnode has already been restored.
|
||||||
// 2. the vnode should be the leader.
|
// 2. the vnode should be the leader.
|
||||||
// 3. the stream is not suspended yet.
|
// 3. the stream is not suspended yet.
|
||||||
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
|
if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) {
|
||||||
if (numOfTasks == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) {
|
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "vnd.h"
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||||
|
@ -130,6 +131,11 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
|
// for follower or vnode does not restored, do not launch the stream tasks.
|
||||||
|
if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
|
@ -524,7 +524,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
|
|
||||||
walApplyVer(pVnode->pWal, ver);
|
walApplyVer(pVnode->pWal, ver);
|
||||||
|
|
||||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
|
if (tqPushMsg(pVnode->pTq, pMsg->msgType) < 0) {
|
||||||
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -262,9 +262,13 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
int64_t keys[2];
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
||||||
uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId));
|
keys[0] = pId->streamId;
|
||||||
|
keys[1] = pId->taskId;
|
||||||
|
|
||||||
|
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
|
|
||||||
int8_t prev = p->status.taskStatus;
|
int8_t prev = p->status.taskStatus;
|
||||||
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
||||||
|
|
|
@ -465,7 +465,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
|
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
|
||||||
|
@ -485,6 +485,12 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
|
||||||
pTask->status.stage += 1;
|
pTask->status.stage += 1;
|
||||||
|
|
||||||
streamSetStatusNormal(pTask);
|
streamSetStatusNormal(pTask);
|
||||||
|
|
||||||
|
taosWLockLatch(&pTask->pMeta->lock);
|
||||||
|
streamMetaSaveTask(pTask->pMeta, pTask);
|
||||||
|
streamMetaCommit(pTask->pMeta);
|
||||||
|
taosWUnLockLatch(&pTask->pMeta->lock);
|
||||||
|
|
||||||
qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
|
qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
|
||||||
pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));
|
pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
|
@ -493,36 +499,6 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo remove it
|
|
||||||
//int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) {
|
|
||||||
// int32_t numOfLevels = taosArrayGetSize(pTaskList);
|
|
||||||
//
|
|
||||||
// for (int32_t j = 0; j < numOfLevels; ++j) {
|
|
||||||
// SArray *pLevel = taosArrayGetP(pTaskList, j);
|
|
||||||
//
|
|
||||||
// int32_t numOfTasks = taosArrayGetSize(pLevel);
|
|
||||||
// for (int32_t k = 0; k < numOfTasks; ++k) {
|
|
||||||
// SStreamTask *pTask = taosArrayGetP(pLevel, k);
|
|
||||||
// if (pTask->info.nodeId == nodeId) {
|
|
||||||
// pTask->info.epSet = *pEpSet;
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // check for the dispath info and the upstream task info
|
|
||||||
// int32_t level = pTask->info.taskLevel;
|
|
||||||
// if (level == TASK_LEVEL__SOURCE) {
|
|
||||||
// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
|
||||||
// } else if (level == TASK_LEVEL__AGG) {
|
|
||||||
// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
|
||||||
// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
|
||||||
// } else { // TASK_LEVEL__SINK
|
|
||||||
// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// return 0;
|
|
||||||
//}
|
|
||||||
|
|
||||||
int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
||||||
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
||||||
epsetAssign(&pTask->info.epSet, pEpSet);
|
epsetAssign(&pTask->info.epSet, pEpSet);
|
||||||
|
|
Loading…
Reference in New Issue