Merge branch 'enh/triggerCheckPoint2' of https://github.com/taosdata/TDengine into enh/triggerCheckPoint2
This commit is contained in:
commit
d106b5a87e
|
@ -256,16 +256,13 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MON_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
|
||||
|
@ -304,8 +301,10 @@ enum {
|
|||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
||||
// TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
|
||||
|
|
|
@ -543,9 +543,16 @@ typedef struct {
|
|||
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
|
||||
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
||||
|
||||
typedef struct {
|
||||
typedef struct STaskStatusEntry {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t status;
|
||||
} STaskStatusEntry;
|
||||
|
||||
typedef struct SStreamHbMsg {
|
||||
int32_t vgId;
|
||||
int32_t numOfTasks;
|
||||
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
|
||||
} SStreamHbMsg;
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
||||
|
|
|
@ -77,10 +77,10 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
|
|
@ -737,10 +737,10 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -41,8 +41,11 @@ typedef struct SNodeEntry {
|
|||
} SNodeEntry;
|
||||
|
||||
typedef struct SStreamVnodeRevertIndex {
|
||||
SArray *pNodeEntryList;
|
||||
int64_t ts; // snapshot ts
|
||||
SArray *pNodeEntryList;
|
||||
int64_t ts; // snapshot ts
|
||||
SHashObj *pTaskMap;
|
||||
SArray *pTaskList;
|
||||
TdThreadMutex lock;
|
||||
} SStreamVnodeRevertIndex;
|
||||
|
||||
typedef struct SVgroupChangeInfo {
|
||||
|
@ -74,6 +77,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
|||
int64_t streamId, int32_t taskId);
|
||||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||
static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode);
|
||||
|
||||
static SArray *doExtractNodeListFromStream(SMnode *pMnode);
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
|
||||
|
@ -106,7 +110,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
|
||||
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
|
||||
|
||||
|
@ -118,6 +122,10 @@ int32_t mndInitStream(SMnode *pMnode) {
|
|||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
||||
|
||||
taosThreadMutexInit(&execNodeList.lock, NULL);
|
||||
execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry));
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
|
@ -853,6 +861,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
keepStreamTasksInBuf(pStream, &execNodeList);
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_OVER:
|
||||
|
@ -1123,13 +1132,14 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
|||
SStreamObj *pStream = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
{
|
||||
{ // check if the node update happens or not
|
||||
int64_t ts = taosGetTimestampSec();
|
||||
|
||||
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
|
||||
if (execNodeList.pNodeEntryList != NULL) {
|
||||
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
|
||||
}
|
||||
|
||||
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
|
||||
}
|
||||
|
||||
|
@ -1154,6 +1164,26 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
{// check if all tasks are in TASK_STATUS__NORMAL status
|
||||
bool ready = true;
|
||||
|
||||
taosThreadMutexLock(&execNodeList.lock);
|
||||
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
|
||||
STaskStatusEntry* p = taosArrayGet(execNodeList.pTaskList, i);
|
||||
if (p->status != TASK_STATUS__NORMAL) {
|
||||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
|
||||
p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status));
|
||||
ready = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&execNodeList.lock);
|
||||
|
||||
if (!ready) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
|
||||
int64_t checkpointId = pMsg->checkpointId;
|
||||
|
||||
|
@ -2109,6 +2139,7 @@ static SArray *doExtractNodeListFromStream(SMnode *pMnode) {
|
|||
return plist;
|
||||
}
|
||||
|
||||
// this function runs by only one thread, so it is not multi-thread safe
|
||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
||||
if (old != 0) {
|
||||
|
@ -2124,6 +2155,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
if (execNodeList.pNodeEntryList != NULL) {
|
||||
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
|
||||
}
|
||||
|
||||
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
|
||||
}
|
||||
|
||||
|
@ -2172,85 +2204,118 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) {
|
||||
int32_t level = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < level; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < numOfTasks; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||
|
||||
void* p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys));
|
||||
if (p == NULL) {
|
||||
STaskStatusEntry entry = {
|
||||
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
|
||||
taosArrayPush(pExecNode->pTaskList, &entry);
|
||||
|
||||
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
|
||||
taosHashPut(pExecNode->pTaskMap, keys, sizeof(keys), &ordinal, sizeof(ordinal));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// todo: this process should be executed by the write queue worker of the mnode
|
||||
// int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
// SMnode *pMnode = pReq->info.node;
|
||||
// SSdb *pSdb = pMnode->pSdb;
|
||||
// SStreamHbMsg req = {0};
|
||||
// int32_t code = TSDB_CODE_SUCCESS;
|
||||
//
|
||||
// SDecoder decoder = {0};
|
||||
// tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
|
||||
//
|
||||
// if (tStartDecode(&decoder) < 0) return -1;
|
||||
//
|
||||
// if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
|
||||
// terrno = TSDB_CODE_INVALID_MSG;
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamHbMsg req = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// int64_t now = taosGetTimestampSec();
|
||||
// mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||
//
|
||||
// // timeout list
|
||||
// bool nodeChanged = false;
|
||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||
|
||||
taosThreadMutexLock(&execNodeList.lock);
|
||||
for(int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
|
||||
int64_t k[2] = {p->streamId, p->taskId};
|
||||
int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
||||
|
||||
STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
|
||||
pStatusEntry->status = p->status;
|
||||
}
|
||||
taosThreadMutexUnlock(&execNodeList.lock);
|
||||
|
||||
// bool nodeChanged = false;
|
||||
// SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
||||
//
|
||||
// // record the timeout node
|
||||
// for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
|
||||
// SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
|
||||
// int64_t duration = now - pEntry->hbTimestamp;
|
||||
// if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
|
||||
// taosArrayPush(pList, &pEntry);
|
||||
// mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
|
||||
// continue;
|
||||
/*
|
||||
// record the timeout node
|
||||
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
|
||||
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
|
||||
int64_t duration = now - pEntry->hbTimestamp;
|
||||
if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
|
||||
taosArrayPush(pList, &pEntry);
|
||||
mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->nodeId != req.vgId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pEntry->hbTimestamp = now;
|
||||
|
||||
// check epset to identify whether the node has been transferred to other dnodes.
|
||||
// node the epset is changed, which means the node transfer has occurred for this node.
|
||||
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
|
||||
// nodeChanged = true;
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// if (pEntry->nodeId != req.vgId) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// pEntry->hbTimestamp = now;
|
||||
//
|
||||
// // check epset to identify whether the node has been transferred to other dnodes.
|
||||
// // node the epset is changed, which means the node transfer has occurred for this node.
|
||||
//// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
|
||||
//// nodeChanged = true;
|
||||
//// break;
|
||||
//// }
|
||||
// }
|
||||
//
|
||||
// // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
|
||||
// // to identify whether the dnode is truely offline or not.
|
||||
//
|
||||
// // handle the node changed case
|
||||
// if (!nodeChanged) {
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
//
|
||||
// int32_t nodeId = req.vgId;
|
||||
//
|
||||
// {// check all streams that involved this vnode should update the epset info
|
||||
// SStreamObj *pStream = NULL;
|
||||
// void *pIter = NULL;
|
||||
// while (1) {
|
||||
// pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
// if (pIter == NULL) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// // update the related upstream and downstream tasks, todo remove this, no need this function
|
||||
// taosWLockLatch(&pStream->lock);
|
||||
//// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
|
||||
//// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
|
||||
// taosWUnLockLatch(&pStream->lock);
|
||||
//
|
||||
//// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
|
||||
//// if (code != TSDB_CODE_SUCCESS) {
|
||||
//// todo
|
||||
}
|
||||
|
||||
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
|
||||
// to identify whether the dnode is truely offline or not.
|
||||
|
||||
// handle the node changed case
|
||||
if (!nodeChanged) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t nodeId = req.vgId;
|
||||
|
||||
{// check all streams that involved this vnode should update the epset info
|
||||
SStreamObj *pStream = NULL;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
// update the related upstream and downstream tasks, todo remove this, no need this function
|
||||
taosWLockLatch(&pStream->lock);
|
||||
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
|
||||
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
||||
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo
|
||||
//// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
}
|
||||
*/
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -411,13 +411,13 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
return sndProcessTaskRetrieveReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_RSP:
|
||||
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH:
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
|
||||
return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECK:
|
||||
case TDMT_VND_STREAM_TASK_CHECK:
|
||||
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECK_RSP:
|
||||
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
||||
return sndProcessStreamTaskCheckRsp(pSnode, pMsg);
|
||||
default:
|
||||
ASSERT(0);
|
||||
|
|
|
@ -1045,9 +1045,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
}
|
||||
|
||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
||||
" child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms",
|
||||
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
pTask->info.fillHistory, pTask->triggerParam);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -1335,9 +1336,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
} else {
|
||||
// todo update the chkInfo version for current task.
|
||||
// this task has an associated history stream task, so we need to scan wal from the end version of
|
||||
// history scan. The current version of chkInfo.current is not updated during the history scan
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
|
||||
if (pTask->historyTaskId.taskId == 0) {
|
||||
|
@ -1356,7 +1354,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
|
||||
}
|
||||
|
||||
// notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
|
||||
code = streamTaskScanHistoryDataComplete(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
|
@ -1408,6 +1405,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// only the agg tasks and the sink tasks will receive this message from upstream tasks
|
||||
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -1434,6 +1432,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
// NOTE: the rsp msg should be kept in WAL file.
|
||||
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -1462,6 +1461,20 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
|
||||
"completed msg",
|
||||
pTask->id.idStr, req.downstreamId);
|
||||
|
||||
// the scan-history finish status should be recorded in the WAL files. So the transfer of the task status from
|
||||
// scan-history
|
||||
// to normal should be executed by write thread of each vnode.
|
||||
|
||||
// void* buf = NULL;
|
||||
// int32_t tlen = 0;
|
||||
// // encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
|
||||
//
|
||||
// SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen};
|
||||
// if (tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||
// tqError("failed to put into write-queue since %s", terrstr());
|
||||
// }
|
||||
|
||||
streamProcessScanHistoryFinishRsp(pTask);
|
||||
}
|
||||
|
||||
|
|
|
@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
||||
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
||||
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECK:
|
||||
case TDMT_VND_STREAM_TASK_CHECK:
|
||||
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECK_RSP:
|
||||
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
||||
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE:
|
||||
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
||||
|
@ -674,9 +674,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH:
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
|
||||
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
|
||||
return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);
|
||||
|
|
|
@ -269,7 +269,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
|
|||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
initRpcMsg(&msg, TDMT_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
|
||||
initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
|
||||
qDebug("s-task:%s (level:%d) dispatch check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
|
||||
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
|
||||
|
||||
|
@ -277,50 +277,6 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
|
||||
SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
SRpcMsg msg = {0};
|
||||
|
||||
int32_t tlen;
|
||||
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
|
||||
if (buf) {
|
||||
rpcFreeCont(buf);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
msg.contLen = tlen + sizeof(SMsgHead);
|
||||
msg.pCont = buf;
|
||||
msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
|
||||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
|
||||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||
qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
|
||||
pReq->downstreamTaskId, vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||
|
@ -718,7 +674,7 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist
|
|||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
initRpcMsg(&msg, TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
|
||||
initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
|
||||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||
|
|
|
@ -13,12 +13,12 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <libs/sync/sync.h>
|
||||
#include "executor.h"
|
||||
#include "streamBackendRocksdb.h"
|
||||
#include "streamInt.h"
|
||||
#include "tref.h"
|
||||
#include "ttimer.h"
|
||||
#include "tstream.h"
|
||||
|
||||
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
||||
int32_t streamBackendId = 0;
|
||||
|
@ -86,8 +86,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
pMeta->expandFunc = expandFunc;
|
||||
pMeta->stage = stage;
|
||||
|
||||
// send heartbeat every 20sec.
|
||||
pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer);
|
||||
// send heartbeat every 5sec.
|
||||
pMeta->hbTmr = taosTmrStart(metaHbToMnode, 5000, pMeta, streamEnv.timer);
|
||||
|
||||
pMeta->pTaskBackendUnique =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
|
@ -572,6 +572,13 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
|||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
|
||||
|
||||
for(int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
||||
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
|
||||
if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
||||
}
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
@ -580,12 +587,22 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
|||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
|
||||
|
||||
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
|
||||
for(int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
||||
STaskStatusEntry hb = {0};
|
||||
if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &hb.status) < 0) return -1;
|
||||
|
||||
taosArrayPush(pReq->pTaskStatus, &hb);
|
||||
}
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void metaHbToMnode(void* param, void* tmrId) {
|
||||
#if 0
|
||||
SStreamMeta* pMeta = param;
|
||||
SStreamHbMsg hbMsg = {0};
|
||||
|
||||
|
@ -630,6 +647,5 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
tmsgSendReq(&pMeta->mgmtInfo.epset, &msg);
|
||||
|
||||
// next hb will be issued in 20sec.
|
||||
taosTmrReset(metaHbToMnode, 20000, pMeta, streamEnv.timer, pMeta->hbTmr);
|
||||
#endif
|
||||
taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, pMeta->hbTmr);
|
||||
}
|
|
@ -423,7 +423,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
|||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
|
||||
|
||||
// sink node do not send end of scan history msg to its upstream, which is agg task.
|
||||
// sink tasks do not send end of scan history msg to its upstream, which is agg task.
|
||||
streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
|
||||
|
||||
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
|
||||
|
@ -440,9 +440,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
|||
streamAggUpstreamScanHistoryFinish(pTask);
|
||||
}
|
||||
|
||||
// all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data
|
||||
// from the WAL files, which contains the real time stream data.
|
||||
streamNotifyUpstreamContinue(pTask);
|
||||
|
||||
// sink node does not receive the pause msg from mnode, so does not need enable it
|
||||
// mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks.
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||
streamTaskEnablePause(pTask);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue