Merge branch 'enh/triggerCheckPoint2' into enh/chkpTransfer

This commit is contained in:
yihaoDeng 2023-08-11 02:50:12 +00:00
commit 99be2836b2
14 changed files with 233 additions and 122 deletions

View File

@ -28,6 +28,22 @@ typedef struct SCorEpSet {
} SCorEpSet;
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
#define EPSET_TO_STR(_eps, tbuf) \
do { \
int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \
for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \
if (_i == (_eps)->numOfEps - 1) { \
len += \
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
} else { \
len += \
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
} \
} \
len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \
} while (0);
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);

View File

@ -301,7 +301,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
// TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)

View File

@ -558,13 +558,13 @@ typedef struct {
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
typedef struct {
typedef struct SNodeUpdateInfo {
int32_t nodeId;
SEpSet prevEp;
SEpSet newEp;
} SNodeUpdateInfo;
typedef struct {
typedef struct SStreamTaskNodeUpdateMsg {
int64_t streamId;
int32_t taskId;
SArray* pNodeList; // SArray<SNodeUpdateInfo>
@ -573,6 +573,14 @@ typedef struct {
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
typedef struct SStreamTaskNodeUpdateRsp {
int64_t streamId;
int32_t taskId;
} SStreamTaskNodeUpdateRsp;
int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg);
int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg);
typedef struct {
int64_t streamId;
int32_t downstreamTaskId;
@ -632,7 +640,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir);
int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);

View File

@ -730,8 +730,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
@ -742,11 +742,12 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -94,6 +94,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
@ -1743,14 +1744,17 @@ typedef struct SVgroupChangeInfo {
SArray* pUpdateNodeList; //SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo) {
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo, int64_t streamId, int32_t taskId) {
pMsg->streamId = streamId;
pMsg->taskId = taskId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, SVgroupChangeInfo* pInfo) {
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
int64_t streamId, int32_t taskId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo);
initNodeUpdateMsg(&req, pInfo, streamId, taskId);
int32_t code = 0;
int32_t blen;
@ -1847,7 +1851,8 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
void *pBuf = NULL;
int32_t len = 0;
doBuildStreamTaskUpdateMsg(&pBuf, &len, pTask->info.nodeId, pInfo);
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId);
STransAction action = {0};
initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet);
@ -1881,24 +1886,47 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
return TSDB_CODE_ACTION_IN_PROGRESS;
}
// todo. 1. multiple change, 2. replica change problem
static SVgroupChangeInfo mndFindChangedVgroupInfo(SMnode *pMnode, const SArray *pPrevVgroupList,
const SArray *pVgroupList) {
static bool isNodeEpsetChanged(const SEpSet* pPrevEpset, const SEpSet* pCurrent) {
const SEp* pEp = GET_ACTIVE_EP(pPrevEpset);
for(int32_t i = 0; i < pCurrent->numOfEps; ++i) {
const SEp* p = &(pCurrent->eps[i]);
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
}
}
return true;
}
// 1. increase the replica does not affect the stream process.
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
// tasks on the will be removed replica.
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we will
// handle it as mentioned in 1 & 2 items.
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) {
SVgroupChangeInfo info = {
.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
.pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
};
int32_t numOfVgroups = taosArrayGetSize(pPrevVgroupList);
for (int32_t i = 0; i < numOfVgroups; ++i) {
SNodeEntry *pPrevEntry = taosArrayGet(pPrevVgroupList, i);
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
int32_t num = taosArrayGetSize(pVgroupList);
int32_t num = taosArrayGetSize(pNodeList);
for (int32_t j = 0; j < num; ++j) {
SNodeEntry *pCurrent = taosArrayGet(pVgroupList, j);
SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
if (pCurrent->nodeId == pPrevEntry->nodeId) {
// todo handle the replica change problem.
if (!isEpsetEqual(&pCurrent->epset, &pPrevEntry->epset)) {
if (isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
char buf[256] = {0};
EPSET_TO_STR(&pCurrent->epset, buf);
mDebug("nodeId:%d epset changed detected, old:%s:%d -> new:%s", pCurrent->nodeId, pPrevEp->fqdn,
pPrevEp->port, buf);
SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
@ -1908,6 +1936,7 @@ static SVgroupChangeInfo mndFindChangedVgroupInfo(SMnode *pMnode, const SArray *
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
mndReleaseVgroup(pMnode, pVgroup);
}
break;
}
}
@ -1953,11 +1982,6 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
break;
}
// update the related upstream and downstream tasks, todo remove this, no need this function
// taosWLockLatch(&pStream->lock);
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
// taosWUnLockLatch(&pStream->lock);
void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
if (p == NULL && p1 == NULL) {
@ -1975,7 +1999,7 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
return 0;
}
static SArray* doExtractNodeList(SMnode *pMnode) {
static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -2022,7 +2046,6 @@ static SArray* doExtractNodeList(SMnode *pMnode) {
}
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
if (old != 0) {
mDebug("still in checking node change");
@ -2030,17 +2053,26 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
}
mDebug("start to do node change checking");
int64_t ts = taosGetTimestampSec();
SMnode *pMnode = pMsg->info.node;
if (execNodeList.pNodeEntryList == NULL) {
execNodeList.pNodeEntryList = doExtractNodeList(pMnode);
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
if (execNodeList.pNodeEntryList != NULL) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
}
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
}
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
execNodeList.ts = ts;
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0;
}
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
int64_t ts = taosGetTimestampSec();
SVgroupChangeInfo changeInfo = mndFindChangedVgroupInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
mndProcessVgroupChange(pMnode, &changeInfo);
}
@ -2053,7 +2085,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
execNodeList.pNodeEntryList = pNodeSnapshot;
execNodeList.ts = ts;
mDebug("end to do node change checking");
mDebug("end to do stream task node change checking");
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0;
}

View File

@ -192,37 +192,40 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
}
void tqNotifyClose(STQ* pTq) {
if (pTq != NULL) {
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
tqDebug("vgId:%d s-task:%s set closing flag", pMeta->vgId, pTask->id.idStr);
streamTaskStop(pTask);
}
taosWUnLockLatch(&pMeta->lock);
tqDebug("vgId:%d start to check all tasks", pMeta->vgId);
int64_t st = taosGetTimestampMs();
while (hasStreamTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms",
pMeta->vgId, el);
if (pTq == NULL) {
return;
}
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
tqDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId);
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
tqDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr);
streamTaskStop(pTask);
}
taosWUnLockLatch(&pMeta->lock);
tqDebug("vgId:%d start to check all tasks", vgId);
int64_t st = taosGetTimestampMs();
while (hasStreamTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
}
//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
@ -1100,8 +1103,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pTq->pStreamMeta->vgId);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed",
rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return -1;
}
@ -1116,9 +1119,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
int32_t vgId = TD_VID(pTq->pVnode);
if (tsDisableStream) {
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
return 0;
}
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
// 1.deserialize msg and build task
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
@ -1141,8 +1147,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
// 2.save task, use the newest commit version as the initial start version of stream task.
// 2.save task, use the latest commit version as the initial start version of stream task.
int32_t taskId = pTask->id.taskId;
int64_t streamId = pTask->id.streamId;
bool added = false;
taosWLockLatch(&pStreamMeta->lock);
@ -1151,21 +1158,26 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWUnLockLatch(&pStreamMeta->lock);
if (code < 0) {
tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks);
tqError("vgId:%d failed to add s-task:0x%x, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
tFreeStreamTask(pTask);
return -1;
}
// not added into meta store
// added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
// it is added into the meta store
if (added) {
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, pTask->id.streamId, taskId);
if (p != NULL) { // reset the downstreamReady flag.
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
bool restored = pTq->pVnode->restored;
if (p != NULL && restored) { // reset the downstreamReady flag.
streamTaskCheckDownstreamTasks(p);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
}
streamMetaReleaseTask(pStreamMeta, p);
} else {
tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId);
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
}
@ -1840,17 +1852,15 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
// streamTaskUpdateEpInfo(pTask);
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) {
tqError(
"vgId:%d failed to acquire fill-history task:0x%x when handling task update, it may have been dropped "
"already",
pMeta->vgId, pTask->historyTaskId.taskId);
tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped",
pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask);
@ -1859,7 +1869,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
}
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
// streamTaskUpdateEpInfo(pHistoryTask);
streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList);
}
if (pHistoryTask != NULL) {

View File

@ -61,7 +61,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to check all (%d) stream tasks downstream status", vgId, numOfTasks);
tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}

View File

@ -81,7 +81,6 @@ _err:
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
return 0;
}
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {

View File

@ -473,7 +473,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
}
break;
case TDMT_STREAM_TASK_DEPLOY: {
if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
if (tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
goto _err;
}
} break;

View File

@ -1007,6 +1007,22 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
taosArrayPush(pMsg->pNodeList, &info);
}
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskUpdateRsp(SEncoder* pEncoder, const SStreamTaskNodeUpdateRsp* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskUpdateRsp(SDecoder* pDecoder, SStreamTaskNodeUpdateRsp* pMsg) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}

View File

@ -469,6 +469,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
int32_t streamLoadTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
return -1;

View File

@ -67,6 +67,7 @@ const char* streamGetTaskStatusStr(int32_t status) {
case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__CK_READY: return "check-point-ready";
case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop";
default:return "";
}
}
@ -115,6 +116,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.childId = pTask->info.selfChildId,
.stage = pTask->status.stage,
};
// serialize
@ -172,6 +174,7 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
.downstreamTaskId = pRsp->downstreamTaskId,
.downstreamNodeId = pRsp->downstreamNodeId,
.childId = pRsp->childId,
.stage = pTask->status.stage,
};
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,

View File

@ -13,11 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInt.h"
#include "tmisce.h"
#include "executor.h"
#include "streamInt.h"
#include "tstream.h"
#include "wal.h"
#include "ttimer.h"
#include "wal.h"
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
@ -404,7 +405,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
for(int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
if (pInfo->nodeId == nodeId) {
pInfo->epSet = *pEpSet;
epsetAssign(&pInfo->epSet, pEpSet);
break;
}
}
@ -465,6 +466,12 @@ int32_t streamTaskStop(SStreamTask* pTask) {
}
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
qDebug("s-task:%s vgId:%d restart current task, stage:%d, status:%s, sched-status:%d", id, vgId, pTask->status.stage,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
// 1. stop task
streamTaskStop(pTask);
@ -477,39 +484,68 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
pTask->status.downstreamReady = 0;
pTask->status.stage += 1;
qDebug("s-task:%s reset downstream status and stage:%d, start to check downstream", pTask->id.idStr,
pTask->status.stage);
streamSetStatusNormal(pTask);
qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));
// 3. start to check the downstream status
streamTaskCheckDownstreamTasks(pTask);
return 0;
}
int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) {
int32_t numOfLevels = taosArrayGetSize(pTaskList);
// todo remove it
//int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) {
// int32_t numOfLevels = taosArrayGetSize(pTaskList);
//
// for (int32_t j = 0; j < numOfLevels; ++j) {
// SArray *pLevel = taosArrayGetP(pTaskList, j);
//
// int32_t numOfTasks = taosArrayGetSize(pLevel);
// for (int32_t k = 0; k < numOfTasks; ++k) {
// SStreamTask *pTask = taosArrayGetP(pLevel, k);
// if (pTask->info.nodeId == nodeId) {
// pTask->info.epSet = *pEpSet;
// continue;
// }
//
// // check for the dispath info and the upstream task info
// int32_t level = pTask->info.taskLevel;
// if (level == TASK_LEVEL__SOURCE) {
// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
// } else if (level == TASK_LEVEL__AGG) {
// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
// } else { // TASK_LEVEL__SINK
// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
// }
// }
// }
// return 0;
//}
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pTaskList, j);
int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
}
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
if (pTask->info.nodeId == nodeId) {
pTask->info.epSet = *pEpSet;
continue;
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
}
}
return 0;
}
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
for(int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
doUpdateEpsetInfo(pTask, pInfo->nodeId, &pInfo->newEp);
}
return 0;
}

View File

@ -13,6 +13,7 @@
*/
#include "transComm.h"
#include "tmisce.h"
typedef struct {
int32_t numOfConn;
@ -308,18 +309,6 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
} \
} while (0)
#define EPSET_DEBUG_STR(epSet, tbuf) \
do { \
int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \
for (int i = 0; i < (epSet)->numOfEps; i++) { \
if (i == (epSet)->numOfEps - 1) { \
len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
} else { \
len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
} \
} \
len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse); \
} while (0);
static void* cliWorkThread(void* arg);
@ -2167,7 +2156,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
if (rpcDebugFlag & DEBUG_DEBUG) {
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
EPSET_TO_STR(&pCtx->epSet, tbuf);
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryStep, pCtx->retryNextInterval);
}
@ -2396,7 +2385,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (hasEpSet) {
if (rpcDebugFlag & DEBUG_TRACE) {
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
EPSET_TO_STR(&pCtx->epSet, tbuf);
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
}