enh(stream): update the stream task epset.
This commit is contained in:
parent
a713b37f35
commit
6a9826cf6e
|
@ -28,6 +28,22 @@ typedef struct SCorEpSet {
|
||||||
} SCorEpSet;
|
} SCorEpSet;
|
||||||
|
|
||||||
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
|
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
|
||||||
|
|
||||||
|
#define EPSET_TO_STR(_eps, tbuf) \
|
||||||
|
do { \
|
||||||
|
int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \
|
||||||
|
for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \
|
||||||
|
if (_i == (_eps)->numOfEps - 1) { \
|
||||||
|
len += \
|
||||||
|
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
|
||||||
|
} else { \
|
||||||
|
len += \
|
||||||
|
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
||||||
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
||||||
|
|
||||||
|
|
|
@ -301,7 +301,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
// TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
|
||||||
|
|
|
@ -556,13 +556,13 @@ typedef struct {
|
||||||
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
|
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
|
||||||
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
|
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct SNodeUpdateInfo {
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
SEpSet prevEp;
|
SEpSet prevEp;
|
||||||
SEpSet newEp;
|
SEpSet newEp;
|
||||||
} SNodeUpdateInfo;
|
} SNodeUpdateInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct SStreamTaskNodeUpdateMsg {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
SArray* pNodeList; // SArray<SNodeUpdateInfo>
|
SArray* pNodeList; // SArray<SNodeUpdateInfo>
|
||||||
|
@ -571,6 +571,14 @@ typedef struct {
|
||||||
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
|
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
|
||||||
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
|
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
|
||||||
|
|
||||||
|
typedef struct SStreamTaskNodeUpdateRsp {
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t taskId;
|
||||||
|
} SStreamTaskNodeUpdateRsp;
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg);
|
||||||
|
int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t downstreamTaskId;
|
int32_t downstreamTaskId;
|
||||||
|
@ -630,7 +638,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
|
||||||
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
|
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
|
||||||
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir);
|
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir);
|
||||||
int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet);
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||||
int32_t streamTaskStop(SStreamTask* pTask);
|
int32_t streamTaskStop(SStreamTask* pTask);
|
||||||
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
||||||
SRpcHandleInfo* pRpcInfo, int32_t taskId);
|
SRpcHandleInfo* pRpcInfo, int32_t taskId);
|
||||||
|
|
|
@ -730,8 +730,8 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
@ -742,11 +742,12 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -93,6 +93,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
|
||||||
|
@ -1743,14 +1744,17 @@ typedef struct SVgroupChangeInfo {
|
||||||
SArray* pUpdateNodeList; //SArray<SNodeUpdateInfo>
|
SArray* pUpdateNodeList; //SArray<SNodeUpdateInfo>
|
||||||
} SVgroupChangeInfo;
|
} SVgroupChangeInfo;
|
||||||
|
|
||||||
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo) {
|
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo, int64_t streamId, int32_t taskId) {
|
||||||
|
pMsg->streamId = streamId;
|
||||||
|
pMsg->taskId = taskId;
|
||||||
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
|
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
|
||||||
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
|
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, SVgroupChangeInfo* pInfo) {
|
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
|
||||||
|
int64_t streamId, int32_t taskId) {
|
||||||
SStreamTaskNodeUpdateMsg req = {0};
|
SStreamTaskNodeUpdateMsg req = {0};
|
||||||
initNodeUpdateMsg(&req, pInfo);
|
initNodeUpdateMsg(&req, pInfo, streamId, taskId);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t blen;
|
int32_t blen;
|
||||||
|
@ -1847,7 +1851,8 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
|
||||||
|
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
doBuildStreamTaskUpdateMsg(&pBuf, &len, pTask->info.nodeId, pInfo);
|
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
||||||
|
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId);
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet);
|
initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet);
|
||||||
|
@ -1881,24 +1886,47 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo. 1. multiple change, 2. replica change problem
|
static bool isNodeEpsetChanged(const SEpSet* pPrevEpset, const SEpSet* pCurrent) {
|
||||||
static SVgroupChangeInfo mndFindChangedVgroupInfo(SMnode *pMnode, const SArray *pPrevVgroupList,
|
const SEp* pEp = GET_ACTIVE_EP(pPrevEpset);
|
||||||
const SArray *pVgroupList) {
|
|
||||||
|
for(int32_t i = 0; i < pCurrent->numOfEps; ++i) {
|
||||||
|
const SEp* p = &(pCurrent->eps[i]);
|
||||||
|
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. increase the replica does not affect the stream process.
|
||||||
|
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
|
||||||
|
// tasks on the will be removed replica.
|
||||||
|
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we will
|
||||||
|
// handle it as mentioned in 1 & 2 items.
|
||||||
|
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) {
|
||||||
SVgroupChangeInfo info = {
|
SVgroupChangeInfo info = {
|
||||||
.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
|
.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
|
||||||
.pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
|
.pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t numOfVgroups = taosArrayGetSize(pPrevVgroupList);
|
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
|
||||||
for (int32_t i = 0; i < numOfVgroups; ++i) {
|
for (int32_t i = 0; i < numOfNodes; ++i) {
|
||||||
SNodeEntry *pPrevEntry = taosArrayGet(pPrevVgroupList, i);
|
SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
|
||||||
|
|
||||||
int32_t num = taosArrayGetSize(pVgroupList);
|
int32_t num = taosArrayGetSize(pNodeList);
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
SNodeEntry *pCurrent = taosArrayGet(pVgroupList, j);
|
SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
|
||||||
|
|
||||||
if (pCurrent->nodeId == pPrevEntry->nodeId) {
|
if (pCurrent->nodeId == pPrevEntry->nodeId) {
|
||||||
// todo handle the replica change problem.
|
if (isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
|
||||||
if (!isEpsetEqual(&pCurrent->epset, &pPrevEntry->epset)) {
|
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
|
||||||
|
|
||||||
|
char buf[256] = {0};
|
||||||
|
EPSET_TO_STR(&pCurrent->epset, buf);
|
||||||
|
mDebug("nodeId:%d epset changed detected, old:%s:%d -> new:%s", pCurrent->nodeId, pPrevEp->fqdn,
|
||||||
|
pPrevEp->port, buf);
|
||||||
|
|
||||||
SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
|
SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
|
||||||
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
|
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
|
||||||
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
|
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
|
||||||
|
@ -1908,6 +1936,7 @@ static SVgroupChangeInfo mndFindChangedVgroupInfo(SMnode *pMnode, const SArray *
|
||||||
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
||||||
mndReleaseVgroup(pMnode, pVgroup);
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1953,11 +1982,6 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the related upstream and downstream tasks, todo remove this, no need this function
|
|
||||||
// taosWLockLatch(&pStream->lock);
|
|
||||||
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
|
|
||||||
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
|
|
||||||
// taosWUnLockLatch(&pStream->lock);
|
|
||||||
void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
||||||
void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
|
void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
|
||||||
if (p == NULL && p1 == NULL) {
|
if (p == NULL && p1 == NULL) {
|
||||||
|
@ -1975,7 +1999,7 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* doExtractNodeList(SMnode *pMnode) {
|
static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -2022,7 +2046,6 @@ static SArray* doExtractNodeList(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
return 0;
|
|
||||||
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
||||||
if (old != 0) {
|
if (old != 0) {
|
||||||
mDebug("still in checking node change");
|
mDebug("still in checking node change");
|
||||||
|
@ -2030,17 +2053,26 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("start to do node change checking");
|
mDebug("start to do node change checking");
|
||||||
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
if (execNodeList.pNodeEntryList == NULL) {
|
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
|
||||||
execNodeList.pNodeEntryList = doExtractNodeList(pMnode);
|
if (execNodeList.pNodeEntryList != NULL) {
|
||||||
|
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
|
||||||
|
}
|
||||||
|
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
|
||||||
|
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
|
||||||
|
execNodeList.ts = ts;
|
||||||
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
||||||
int64_t ts = taosGetTimestampSec();
|
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedVgroupInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
|
||||||
|
|
||||||
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
||||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||||
mndProcessVgroupChange(pMnode, &changeInfo);
|
mndProcessVgroupChange(pMnode, &changeInfo);
|
||||||
}
|
}
|
||||||
|
@ -2053,7 +2085,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
execNodeList.pNodeEntryList = pNodeSnapshot;
|
execNodeList.pNodeEntryList = pNodeSnapshot;
|
||||||
execNodeList.ts = ts;
|
execNodeList.ts = ts;
|
||||||
|
|
||||||
mDebug("end to do node change checking");
|
mDebug("end to do stream task node change checking");
|
||||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -192,37 +192,40 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqNotifyClose(STQ* pTq) {
|
void tqNotifyClose(STQ* pTq) {
|
||||||
if (pTq != NULL) {
|
if (pTq == NULL) {
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
return;
|
||||||
taosWLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
void* pIter = NULL;
|
|
||||||
while (1) {
|
|
||||||
pIter = taosHashIterate(pMeta->pTasks, pIter);
|
|
||||||
if (pIter == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
|
||||||
tqDebug("vgId:%d s-task:%s set closing flag", pMeta->vgId, pTask->id.idStr);
|
|
||||||
streamTaskStop(pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
tqDebug("vgId:%d start to check all tasks", pMeta->vgId);
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
|
||||||
|
|
||||||
while (hasStreamTaskInTimer(pMeta)) {
|
|
||||||
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t el = taosGetTimestampMs() - st;
|
|
||||||
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms",
|
|
||||||
pMeta->vgId, el);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
|
tqDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId);
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
void* pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(pMeta->pTasks, pIter);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||||
|
tqDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr);
|
||||||
|
streamTaskStop(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
tqDebug("vgId:%d start to check all tasks", vgId);
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
|
while (hasStreamTaskInTimer(pMeta)) {
|
||||||
|
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
|
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
|
||||||
}
|
}
|
||||||
|
|
||||||
//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
|
//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
|
||||||
|
@ -1100,8 +1103,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
|
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed",
|
||||||
pTq->pStreamMeta->vgId);
|
rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId);
|
||||||
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1116,9 +1119,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
if (tsDisableStream) {
|
if (tsDisableStream) {
|
||||||
|
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
|
||||||
|
|
||||||
// 1.deserialize msg and build task
|
// 1.deserialize msg and build task
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
@ -1141,8 +1147,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
|
|
||||||
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
|
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
// 2.save task, use the newest commit version as the initial start version of stream task.
|
// 2.save task, use the latest commit version as the initial start version of stream task.
|
||||||
int32_t taskId = pTask->id.taskId;
|
int32_t taskId = pTask->id.taskId;
|
||||||
|
int64_t streamId = pTask->id.streamId;
|
||||||
bool added = false;
|
bool added = false;
|
||||||
|
|
||||||
taosWLockLatch(&pStreamMeta->lock);
|
taosWLockLatch(&pStreamMeta->lock);
|
||||||
|
@ -1151,21 +1158,26 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
taosWUnLockLatch(&pStreamMeta->lock);
|
taosWUnLockLatch(&pStreamMeta->lock);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks);
|
tqError("vgId:%d failed to add s-task:0x%x, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// not added into meta store
|
// added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
|
||||||
|
// it is added into the meta store
|
||||||
if (added) {
|
if (added) {
|
||||||
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
|
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
|
||||||
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, pTask->id.streamId, taskId);
|
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
|
||||||
if (p != NULL) { // reset the downstreamReady flag.
|
|
||||||
|
bool restored = pTq->pVnode->restored;
|
||||||
|
if (p != NULL && restored) { // reset the downstreamReady flag.
|
||||||
streamTaskCheckDownstreamTasks(p);
|
streamTaskCheckDownstreamTasks(p);
|
||||||
|
} else if (!restored) {
|
||||||
|
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
|
||||||
}
|
}
|
||||||
streamMetaReleaseTask(pStreamMeta, p);
|
streamMetaReleaseTask(pStreamMeta, p);
|
||||||
} else {
|
} else {
|
||||||
tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId);
|
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1840,17 +1852,15 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
|
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
||||||
// streamTaskUpdateEpInfo(pTask);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
|
|
||||||
SStreamTask* pHistoryTask = NULL;
|
SStreamTask* pHistoryTask = NULL;
|
||||||
if (pTask->historyTaskId.taskId != 0) {
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask == NULL) {
|
||||||
tqError(
|
tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped",
|
||||||
"vgId:%d failed to acquire fill-history task:0x%x when handling task update, it may have been dropped "
|
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||||
"already",
|
|
||||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
|
@ -1859,7 +1869,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
|
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
|
||||||
// streamTaskUpdateEpInfo(pHistoryTask);
|
streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHistoryTask != NULL) {
|
if (pHistoryTask != NULL) {
|
||||||
|
|
|
@ -61,7 +61,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
tqDebug("vgId:%d start to check all (%d) stream tasks downstream status", vgId, numOfTasks);
|
tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,6 @@ _err:
|
||||||
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
return code;
|
return code;
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
|
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
|
||||||
|
|
|
@ -473,7 +473,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_STREAM_TASK_DEPLOY: {
|
case TDMT_STREAM_TASK_DEPLOY: {
|
||||||
if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
|
if (tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
|
|
@ -1010,3 +1010,19 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -410,6 +410,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
|
||||||
int32_t streamLoadTasks(SStreamMeta* pMeta) {
|
int32_t streamLoadTasks(SStreamMeta* pMeta) {
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
|
|
||||||
|
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);
|
||||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||||
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
|
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -67,6 +67,7 @@ const char* streamGetTaskStatusStr(int32_t status) {
|
||||||
case TASK_STATUS__CK: return "check-point";
|
case TASK_STATUS__CK: return "check-point";
|
||||||
case TASK_STATUS__CK_READY: return "check-point-ready";
|
case TASK_STATUS__CK_READY: return "check-point-ready";
|
||||||
case TASK_STATUS__DROPPING: return "dropping";
|
case TASK_STATUS__DROPPING: return "dropping";
|
||||||
|
case TASK_STATUS__STOP: return "stop";
|
||||||
default:return "";
|
default:return "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,6 +116,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
.upstreamTaskId = pTask->id.taskId,
|
.upstreamTaskId = pTask->id.taskId,
|
||||||
.upstreamNodeId = pTask->info.nodeId,
|
.upstreamNodeId = pTask->info.nodeId,
|
||||||
.childId = pTask->info.selfChildId,
|
.childId = pTask->info.selfChildId,
|
||||||
|
.stage = pTask->status.stage,
|
||||||
};
|
};
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
|
@ -172,6 +174,7 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
|
||||||
.downstreamTaskId = pRsp->downstreamTaskId,
|
.downstreamTaskId = pRsp->downstreamTaskId,
|
||||||
.downstreamNodeId = pRsp->downstreamNodeId,
|
.downstreamNodeId = pRsp->downstreamNodeId,
|
||||||
.childId = pRsp->childId,
|
.childId = pRsp->childId,
|
||||||
|
.stage = pTask->status.stage,
|
||||||
};
|
};
|
||||||
|
|
||||||
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
|
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
|
||||||
|
|
|
@ -13,11 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "streamInt.h"
|
#include "tmisce.h"
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
|
#include "streamInt.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
#include "wal.h"
|
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
||||||
int32_t childId = taosArrayGetSize(pArray);
|
int32_t childId = taosArrayGetSize(pArray);
|
||||||
|
@ -404,7 +405,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
|
||||||
for(int32_t i = 0; i < numOfUpstream; ++i) {
|
for(int32_t i = 0; i < numOfUpstream; ++i) {
|
||||||
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
|
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
|
||||||
if (pInfo->nodeId == nodeId) {
|
if (pInfo->nodeId == nodeId) {
|
||||||
pInfo->epSet = *pEpSet;
|
epsetAssign(&pInfo->epSet, pEpSet);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -465,6 +466,12 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
|
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
|
||||||
|
qDebug("s-task:%s vgId:%d restart current task, stage:%d, status:%s, sched-status:%d", id, vgId, pTask->status.stage,
|
||||||
|
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||||
|
|
||||||
// 1. stop task
|
// 1. stop task
|
||||||
streamTaskStop(pTask);
|
streamTaskStop(pTask);
|
||||||
|
|
||||||
|
@ -477,39 +484,68 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
|
||||||
pTask->status.downstreamReady = 0;
|
pTask->status.downstreamReady = 0;
|
||||||
pTask->status.stage += 1;
|
pTask->status.stage += 1;
|
||||||
|
|
||||||
qDebug("s-task:%s reset downstream status and stage:%d, start to check downstream", pTask->id.idStr,
|
streamSetStatusNormal(pTask);
|
||||||
pTask->status.stage);
|
qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
|
||||||
|
pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
// 3. start to check the downstream status
|
// 3. start to check the downstream status
|
||||||
streamTaskCheckDownstreamTasks(pTask);
|
streamTaskCheckDownstreamTasks(pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) {
|
// todo remove it
|
||||||
int32_t numOfLevels = taosArrayGetSize(pTaskList);
|
//int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) {
|
||||||
|
// int32_t numOfLevels = taosArrayGetSize(pTaskList);
|
||||||
|
//
|
||||||
|
// for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||||
|
// SArray *pLevel = taosArrayGetP(pTaskList, j);
|
||||||
|
//
|
||||||
|
// int32_t numOfTasks = taosArrayGetSize(pLevel);
|
||||||
|
// for (int32_t k = 0; k < numOfTasks; ++k) {
|
||||||
|
// SStreamTask *pTask = taosArrayGetP(pLevel, k);
|
||||||
|
// if (pTask->info.nodeId == nodeId) {
|
||||||
|
// pTask->info.epSet = *pEpSet;
|
||||||
|
// continue;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // check for the dispath info and the upstream task info
|
||||||
|
// int32_t level = pTask->info.taskLevel;
|
||||||
|
// if (level == TASK_LEVEL__SOURCE) {
|
||||||
|
// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||||
|
// } else if (level == TASK_LEVEL__AGG) {
|
||||||
|
// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
||||||
|
// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||||
|
// } else { // TASK_LEVEL__SINK
|
||||||
|
// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// return 0;
|
||||||
|
//}
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfLevels; ++j) {
|
int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
||||||
SArray *pLevel = taosArrayGetP(pTaskList, j);
|
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
||||||
|
epsetAssign(&pTask->info.epSet, pEpSet);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
// check for the dispath info and the upstream task info
|
||||||
for (int32_t k = 0; k < numOfTasks; ++k) {
|
int32_t level = pTask->info.taskLevel;
|
||||||
SStreamTask *pTask = taosArrayGetP(pLevel, k);
|
if (level == TASK_LEVEL__SOURCE) {
|
||||||
if (pTask->info.nodeId == nodeId) {
|
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||||
pTask->info.epSet = *pEpSet;
|
} else if (level == TASK_LEVEL__AGG) {
|
||||||
continue;
|
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
||||||
}
|
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||||
|
} else { // TASK_LEVEL__SINK
|
||||||
|
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
||||||
|
}
|
||||||
|
|
||||||
// check for the dispath info and the upstream task info
|
return 0;
|
||||||
int32_t level = pTask->info.taskLevel;
|
}
|
||||||
if (level == TASK_LEVEL__SOURCE) {
|
|
||||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
||||||
} else if (level == TASK_LEVEL__AGG) {
|
for(int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
||||||
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
||||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
doUpdateEpsetInfo(pTask, pInfo->nodeId, &pInfo->newEp);
|
||||||
} else { // TASK_LEVEL__SINK
|
|
||||||
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -13,6 +13,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
#include "tmisce.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t numOfConn;
|
int32_t numOfConn;
|
||||||
|
@ -308,18 +309,6 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define EPSET_DEBUG_STR(epSet, tbuf) \
|
|
||||||
do { \
|
|
||||||
int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \
|
|
||||||
for (int i = 0; i < (epSet)->numOfEps; i++) { \
|
|
||||||
if (i == (epSet)->numOfEps - 1) { \
|
|
||||||
len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
|
|
||||||
} else { \
|
|
||||||
len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
|
|
||||||
} \
|
|
||||||
} \
|
|
||||||
len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse); \
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
static void* cliWorkThread(void* arg);
|
static void* cliWorkThread(void* arg);
|
||||||
|
|
||||||
|
@ -2167,7 +2156,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
if (rpcDebugFlag & DEBUG_DEBUG) {
|
if (rpcDebugFlag & DEBUG_DEBUG) {
|
||||||
STraceId* trace = &pMsg->msg.info.traceId;
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
EPSET_TO_STR(&pCtx->epSet, tbuf);
|
||||||
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
|
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
|
||||||
pCtx->retryStep, pCtx->retryNextInterval);
|
pCtx->retryStep, pCtx->retryNextInterval);
|
||||||
}
|
}
|
||||||
|
@ -2396,7 +2385,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
if (hasEpSet) {
|
if (hasEpSet) {
|
||||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
EPSET_TO_STR(&pCtx->epSet, tbuf);
|
||||||
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue