enh(stream): add trans to reset task status to avoid being frozen in checkpoint status due to doing checkpoint failure of partial tasks.

This commit is contained in:
Haojun Liao 2023-10-09 10:13:53 +08:00
parent 0fda1793e7
commit ee6516f3c4
17 changed files with 309 additions and 208 deletions

View File

@ -3288,7 +3288,7 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SVPauseStreamTaskReq;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
typedef struct {
int8_t reserved;

View File

@ -302,13 +302,12 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
// TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_VERUPDATE, "vnode-stream-ver-update", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)

View File

@ -259,8 +259,9 @@ typedef struct SStreamTaskId {
typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t checkpointVer; // latest checkpointId version
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t failedId; // record the latest failed checkpoint id
} SCheckpointInfo;
typedef struct SStreamStatus {
@ -603,13 +604,15 @@ typedef struct STaskStatusEntry {
int32_t status;
int32_t stage;
int32_t nodeId;
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t offset; // only valid for source task
double inputQUsed; // in MiB
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
int64_t activeCheckpointId; // current active checkpoint id
bool checkpointFailed; // denote if the checkpoint is failed or not
double inputQUsed; // in MiB
double inputRate;
double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dest data size
double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dest data size
} STaskStatusEntry;
typedef struct SStreamHbMsg {
@ -732,7 +735,9 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskUpdateDataVer(SStreamTask* pTask, int64_t ver);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
// source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
@ -768,10 +773,10 @@ void streamMetaInitForSnode(SStreamMeta* pMeta);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
int32_t streamBuildAndSendVerUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t ver);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,

View File

@ -212,6 +212,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -794,14 +794,13 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_VERUPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -44,7 +44,8 @@ typedef struct SNodeEntry {
typedef struct SStreamExecNodeInfo {
SArray *pNodeEntryList;
int64_t ts; // snapshot ts
int64_t ts; // snapshot ts
int64_t activeCheckpoint; // active check point id
SHashObj *pTaskMap;
SArray *pTaskList;
TdThreadMutex lock;
@ -77,14 +78,18 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *doExtractNodeListFromStream(SMnode *pMnode);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name);
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo);
static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -107,6 +112,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
@ -579,21 +585,6 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
return 0;
}
static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream) {
SStreamObj streamObj = {0};
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
streamObj.status = STREAM_STATUS__RECOVER;
SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
@ -1147,7 +1138,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
}
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode);
}
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
@ -1609,7 +1600,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset info
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
sprintf(buf, offsetStr, pe->offset, pe->verStart, pe->verEnd);
sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd);
}
STR_TO_VARSTR(vbuf, buf);
@ -1658,7 +1649,9 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) {
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
SArray* tasks = pStream->tasks;
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(tasks, i);
@ -1678,16 +1671,6 @@ int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) {
return 0;
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t code = mndPauseAllStreamTaskImpl(pTrans, pStream->tasks);
if (code != 0) {
return code;
}
// pStream->pHTasksList is null
// code = mndPauseAllStreamTaskImpl(pTrans, pStream->pHTasksList);
return code;
}
static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) {
SStreamObj streamObj = {0};
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
@ -1741,6 +1724,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
@ -1752,7 +1736,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
// pause all tasks
if (mndPauseAllStreamTasks(pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
@ -1979,20 +1963,9 @@ void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_
// todo extract method: traverse stream tasks
// build trans to update the epset
static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, "stream-task-update");
STrans* pTrans = doCreateTrans(pMnode, pStream, "stream-task-update");
if (pTrans == NULL) {
mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to build stream:0x%" PRIx64 " task DAG update, code:%s", pStream->uid,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
return terrno;
}
taosWLockLatch(&pStream->lock);
@ -2153,7 +2126,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
return 0;
}
static SArray *doExtractNodeListFromStream(SMnode *pMnode) {
static SArray *extractNodeListFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -2174,11 +2147,9 @@ static SArray *doExtractNodeListFromStream(SMnode *pMnode) {
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
SNodeEntry entry = {0};
epsetAssign(&entry.epset, &pTask->info.epSet);
entry.nodeId = pTask->info.nodeId;
entry.hbTimestamp = -1;
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
epsetAssign(&entry.epset, &pTask->info.epSet);
taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
}
}
@ -2235,24 +2206,28 @@ static int32_t doRemoveFromTask(SStreamExecNodeInfo* pExecNode, STaskId* pRemove
return 0;
}
static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) {
static bool taskNodeExists(SArray* pList, int32_t nodeId) {
size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SNodeEntry* pEntry = taosArrayGet(pList, i);
if (pEntry->nodeId == nodeId) {
return true;
}
}
return false;
}
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execNodeList.pTaskList);
int32_t numOfVgroups = taosArrayGetSize(pNodeSnapshot);
for(int32_t i = 0; i < numOfTask; ++i) {
STaskId* pId = taosArrayGet(execNodeList.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, pId, sizeof(*pId));
bool existed = false;
for(int32_t j = 0; j < numOfVgroups; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(pNodeSnapshot, j);
if (pNodeEntry->nodeId == pEntry->nodeId) {
existed = true;
break;
}
}
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
taosArrayPush(pRemoveTaskList, pId);
}
@ -2263,15 +2238,18 @@ static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) {
doRemoveFromTask(&execNodeList, pId);
}
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemoveTaskList),
(int32_t) taosArrayGetSize(execNodeList.pTaskList));
int32_t size = taosArrayGetSize(pNodeSnapshot);
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pExisted = taosArrayGet(execNodeList.pNodeEntryList, i);
SNodeEntry* p = taosArrayGet(execNodeList.pNodeEntryList, i);
for(int32_t j = 0; j < size; ++j) {
SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j);
if (pEntry->nodeId == pExisted->nodeId) {
taosArrayPush(pValidNodeEntryList, pExisted);
if (pEntry->nodeId == p->nodeId) {
taosArrayPush(pValidNodeEntryList, p);
break;
}
}
@ -2302,7 +2280,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
}
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode);
}
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
@ -2315,7 +2293,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
taosThreadMutexLock(&execNodeList.lock);
removeInvalidStreamTask(pNodeSnapshot);
removeExpirednodeEntryAndTask(pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
@ -2343,9 +2321,13 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
}
typedef struct SMStreamNodeCheckMsg {
int8_t holder; // // to fix windows compile error, define place holder
int8_t placeHolder; // // to fix windows compile error, define place holder
} SMStreamNodeCheckMsg;
typedef struct SMStreamTaskResetMsg {
int8_t placeHolder;
} SMStreamTaskResetMsg;
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -2363,6 +2345,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2373,11 +2356,9 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {.id.streamId = pTask->id.streamId,
.id.taskId = pTask->id.taskId,
.stage = -1,
.nodeId = pTask->info.nodeId,
.status = TASK_STATUS__STOP};
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId,
@ -2418,10 +2399,150 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
static STrans* doCreateTrans(SMnode* pMnode, SStreamObj* pStream, const char* name) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, name);
if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
mndTransDrop(pTrans);
return NULL;
}
terrno = 0;
return pTrans;
}
int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset");
if (pTrans == NULL) {
return terrno;
}
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
// todo extract method, with pause stream task
SVResetStreamTaskReq* pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return terrno;
}
}
}
taosWUnLockLatch(&pStream->lock);
int32_t code = mndPersistTransLog(pStream, pTrans);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t mndResetFromCheckpoint(SMnode* pMnode) {
// find the checkpoint trans id
int32_t transId = 0;
{
SSdb *pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) {
break;
}
if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) {
transId = pTrans->id;
sdbRelease(pSdb, pTrans);
sdbCancelFetch(pSdb, pIter);
break;
}
sdbRelease(pSdb, pTrans);
}
}
if (transId == 0) {
mError("failed to find the checkpoint trans, reset not executed");
return TSDB_CODE_SUCCESS;
}
STrans* pTrans = mndAcquireTrans(pMnode, transId);
mndKillTrans(pMnode, pTrans);
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid);
int32_t code = createStreamResetStatusTrans(pMnode, pStream);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pSdb, pIter);
return code;
}
}
return 0;
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
bool checkpointFailed = false;
int64_t activeCheckpointId = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@ -2442,9 +2563,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
if (pEntry == NULL) {
mError("s-task:0x%"PRIx64" not found in mnode task list", p->id.taskId);
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
continue;
}
@ -2462,16 +2583,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
} else {
pEntry->stage = p->stage;
pEntry->inputQUsed = p->inputQUsed;
pEntry->inputRate = p->inputRate;
// pEntry->outputQUsed = p->outputQUsed;
// pEntry->outputRate = p->outputRate;
pEntry->offset = p->offset;
pEntry->verStart = p->verStart;
pEntry->verEnd = p->verEnd;
pEntry->sinkQuota = p->sinkQuota;
pEntry->sinkDataSize = p->sinkDataSize;
streamTaskStatusCopy(pEntry, p);
if (p->activeCheckpointId != 0) {
if (activeCheckpointId != 0) {
ASSERT(activeCheckpointId == p->activeCheckpointId);
} else {
activeCheckpointId = p->activeCheckpointId;
}
if (p->checkpointFailed) {
checkpointFailed = p->checkpointFailed;
}
}
}
pEntry->status = p->status;
@ -2480,6 +2603,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) {
if (execNodeList.activeCheckpoint != activeCheckpointId) {
mInfo("checkpointId:%"PRId64" failed, issue task-reset trans to reset all tasks status", activeCheckpointId);
execNodeList.activeCheckpoint = activeCheckpointId;
mndResetFromCheckpoint(pMnode);
} else {
mDebug("checkpoint:%"PRId64" reset has issued already, ignore it", activeCheckpointId);
}
}
taosThreadMutexUnlock(&execNodeList.lock);
taosArrayDestroy(req.pTaskStatus);

View File

@ -1695,7 +1695,6 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
int32_t numOfRows = 0;
STrans *pTrans = NULL;
int32_t cols = 0;
char *pWrite;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans);

View File

@ -177,9 +177,6 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
// send msg to update the nextProcessedVer attribute for this task if it is a stream task
streamBuildAndSendVerUpdateMsg(pTask->pMsgCb, pSnode->pMeta->vgId, &pTask->id, 0);
streamTaskCheckDownstream(pTask);
return 0;
}

View File

@ -227,7 +227,7 @@ int tqScanWalAsync(STQ* pTq, bool ckPause);
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
int tqCommit(STQ*);

View File

@ -1020,16 +1020,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// only handled in the leader node
if (vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
#if 0
if (pTq->pVnode->restored) {
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
if (p != NULL) {
// send msg to update the nextProcessedVer attribute for this task if it is a stream task
streamBuildAndSendVerUpdateMsg(p->pMsgCb, vgId, &p->id, sversion);
streamMetaReleaseTask(pStreamMeta, p);
}
}
#endif
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
bool restored = pTq->pVnode->restored;
@ -1670,7 +1660,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
// disable auto rsp to source
// disable auto rsp to mnode
pRsp->info.handle = NULL;
// todo: add counter to make sure other tasks would not be trapped in checkpoint state
@ -1714,9 +1704,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS;
}
// todo: handle the partial failure cases
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
if (pTask->status.downstreamReady != 1) {
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
pTask->checkpointingId = req.checkpointId;
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
", set it failure", pTask->id.idStr, req.checkpointId);
streamMetaReleaseTask(pMeta, pTask);
@ -1932,34 +1924,25 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return rsp.code;
}
int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen) {
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*) pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
SVStreamTaskVerUpdateReq* pReq = (SVStreamTaskVerUpdateReq*) pMsg;
tqDebug("vgId:%d receive msg to update task dataVer, task:0x%x dataVer:%" PRId64, vgId, pReq->taskId, pReq->dataVer);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process dataVer msg, failed to find task:0x%x, it may have been destroyed", vgId, pReq->taskId);
return -1;
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId);
return TSDB_CODE_SUCCESS;
}
// commit the dataVer update
streamTaskUpdateDataVer(pTask, pReq->dataVer);
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
if (vnodeIsLeader(pTq->pVnode)) {
if (pTq->pVnode->restored) {
ASSERT(pTask->execInfo.init == 0);
pTask->execInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%" PRId64, pTask->id.idStr, pTask->execInfo.init);
streamTaskCheckDownstream(pTask);
} else {
tqWarn("s-task:%s not launched since vnode (vgId:%d) not ready", pTask->id.idStr, vgId);
}
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (pTask->status.taskStatus == TASK_STATUS__CK) {
streamTaskClearCheckInfo(pTask);
streamSetStatusNormal(pTask);
}
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -309,7 +309,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
SWalCont* pCont = &pReader->pHead->head;
int64_t ver = pCont->version;
if (ver > maxVer) {
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS;
}

View File

@ -583,9 +583,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err;
}
} break;
case TDMT_STREAM_TASK_VERUPDATE:
tqProcessTaskDataVerUpdateReq(pVnode->pTq, pMsg->pCont, pMsg->contLen);
break;
case TDMT_VND_STREAM_TASK_RESET: {
if (pVnode->restored/* && vnodeIsLeader(pVnode)*/) {
tqProcessTaskResetReq(pVnode->pTq, pMsg);
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {

View File

@ -1317,8 +1317,9 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t
}
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
qInfo("do stream range scan. windows index:%d", *pRowIndex);
qDebug("do stream range scan. windows index:%d", *pRowIndex);
bool prepareRes = true;
while (1) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pTableScanOp);

View File

@ -143,8 +143,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
pTask->execInfo.checkpoint += 1;
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
// inputQ, to make sure all blocks with less version have been handled by this task already.
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already.
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
return code;
}
@ -264,6 +264,16 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
return 0;
}
void streamTaskClearCheckInfo(SStreamTask* pTask) {
pTask->checkpointingId = 0; // clear the checkpoint id
pTask->chkInfo.failedId = 0;
pTask->chkInfo.startTs = 0; // clear the recorded start time
pTask->checkpointNotReadyTasks = 0;
pTask->checkpointAlignCnt = 0;
taosArrayClear(pTask->pReadyMsgList);
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
}
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
taosWLockLatch(&pMeta->lock);
@ -283,11 +293,11 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
p->chkInfo.checkpointId = p->checkpointingId;
streamTaskClearCheckInfo(p);
streamSetStatusNormal(p);
// save the task
streamMetaSaveTask(pMeta, p);
streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
stDebug(
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
@ -318,8 +328,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
ASSERT(remain >= 0);
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
pTask->chkInfo.startTs = 0; // clear the recorded start time
if (remain == 0) { // all tasks are ready
stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);

View File

@ -778,9 +778,11 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
if (tEncodeI64(pEncoder, ps->offset) < 0) return -1;
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
@ -805,9 +807,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.offset) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1;
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
@ -895,8 +899,13 @@ void metaHbToMnode(void* param, void* tmrId) {
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->checkpointingId);
entry.activeCheckpointId = (*pTask)->checkpointingId;
}
if ((*pTask)->exec.pWalReader != NULL) {
entry.offset = (*pTask)->chkInfo.nextProcessVer;
entry.processedVer = (*pTask)->chkInfo.nextProcessVer - 1;
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
}

View File

@ -305,7 +305,6 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
streamLaunchFillHistoryTask(pTask);
}
// todo handle error
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr;

View File

@ -700,63 +700,6 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
return code;
}
int32_t streamBuildAndSendVerUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t ver) {
SVStreamTaskVerUpdateReq* pReq = rpcMallocCont(sizeof(SVStreamTaskVerUpdateReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = vgId;
pReq->taskId = pTaskId->taskId;
pReq->streamId = pTaskId->streamId;
pReq->dataVer = ver;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_VERUPDATE, .pCont = pReq, .contLen = sizeof(SVStreamTaskVerUpdateReq)};
int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to send update task:0x%x dataVer msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
return code;
}
stDebug("vgId:%d build and send update table:0x%x dataVer:%"PRId64" msg", vgId, pTaskId->taskId, ver);
return code;
}
int32_t streamTaskUpdateDataVer(SStreamTask* pTask, int64_t ver) {
SStreamMeta* pMeta = pTask->pMeta;
// commit the dataVer update
int64_t prevVer = 0;
taosThreadMutexLock(&pTask->lock);
if (pTask->chkInfo.checkpointId == 0) {
prevVer = pTask->chkInfo.nextProcessVer;
pTask->chkInfo.nextProcessVer = ver;
taosThreadMutexUnlock(&pTask->lock);
taosWLockLatch(&pMeta->lock);
if (streamMetaSaveTask(pMeta, pTask) < 0) {
// return -1;
}
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
stDebug("s-task:%s nextProcessedVer is update from %" PRId64 " to %" PRId64 " checkpointId:%" PRId64
" checkpointVer:%" PRId64,
pTask->id.idStr, prevVer, ver, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
taosWUnLockLatch(&pMeta->lock);
} else {
stDebug("s-task:%s not update the dataVer, existed:%" PRId64 ", checkpointId:%" PRId64 " checkpointVer:%" PRId64,
pTask->id.idStr, pTask->chkInfo.nextProcessVer, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
taosThreadMutexUnlock(&pTask->lock);
}
return TSDB_CODE_SUCCESS;
}
STaskId streamTaskExtractKey(const SStreamTask* pTask) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
return id;
@ -788,4 +731,25 @@ const char* streamGetTaskStatusStr(int32_t status) {
case TASK_STATUS__UNINIT: return "uninitialized";
default:return "";
}
}
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
pEntry->id.streamId = pTask->id.streamId;
pEntry->id.taskId = pTask->id.taskId;
pEntry->stage = -1;
pEntry->nodeId = pTask->info.nodeId;
pEntry->status = TASK_STATUS__STOP;
}
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
pDst->stage = pSrc->stage;
pDst->inputQUsed = pSrc->inputQUsed;
pDst->inputRate = pSrc->inputRate;
pDst->processedVer = pSrc->processedVer;
pDst->verStart = pSrc->verStart;
pDst->verEnd = pSrc->verEnd;
pDst->sinkQuota = pSrc->sinkQuota;
pDst->sinkDataSize = pSrc->sinkDataSize;
pDst->activeCheckpointId = pSrc->activeCheckpointId;
pDst->checkpointFailed = pSrc->checkpointFailed;
}