Merge pull request #24207 from taosdata/fix/3_liaohj

fix(stream): add async call restart, instead of sync wait, and fix several deadlock.
This commit is contained in:
Haojun Liao 2023-12-25 13:56:42 +08:00 committed by GitHub
commit bcc57e0644
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 693 additions and 449 deletions

View File

@ -30,7 +30,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t startStreamTasks(SStreamMeta* pMeta); int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
int32_t resetStreamTaskStatus(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
#endif // TDENGINE_TQ_COMMON_H #endif // TDENGINE_TQ_COMMON_H

View File

@ -53,6 +53,7 @@ extern "C" {
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_START_ALL_TASKS_ID (-2) #define STREAM_EXEC_START_ALL_TASKS_ID (-2)
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) #define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
#define STREAM_EXEC_STOP_ALL_TASKS_ID (-4)
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;
@ -314,6 +315,7 @@ typedef struct SCheckpointInfo {
int32_t checkpointNotReadyTasks; int32_t checkpointNotReadyTasks;
bool dispatchCheckpointTrigger; bool dispatchCheckpointTrigger;
int64_t msgVer; int64_t msgVer;
int32_t transId;
} SCheckpointInfo; } SCheckpointInfo;
typedef struct SStreamStatus { typedef struct SStreamStatus {
@ -324,6 +326,7 @@ typedef struct SStreamStatus {
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int32_t timerActive; // timer is active int32_t timerActive; // timer is active
int8_t allowedAddInTimer; // allowed to add into timer
int32_t inScanHistorySentinel; int32_t inScanHistorySentinel;
} SStreamStatus; } SStreamStatus;
@ -460,14 +463,18 @@ struct SStreamTask {
char reserve[256]; char reserve[256];
}; };
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
typedef struct STaskStartInfo { typedef struct STaskStartInfo {
int64_t startTs; int64_t startTs;
int64_t readyTs; int64_t readyTs;
int32_t tasksWillRestart; int32_t tasksWillRestart;
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
int64_t elapsedTime; int64_t elapsedTime;
int32_t restartCount; // restart task counter
startComplete_fn_t completeFn; // complete callback function
} STaskStartInfo; } STaskStartInfo;
typedef struct STaskUpdateInfo { typedef struct STaskUpdateInfo {
@ -489,8 +496,9 @@ typedef struct SStreamMeta {
int32_t vgId; int32_t vgId;
int64_t stage; int64_t stage;
int32_t role; int32_t role;
bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
STaskStartInfo startInfo; STaskStartInfo startInfo;
SRWLatch lock; TdThreadRwlock lock;
int32_t walScanCounter; int32_t walScanCounter;
void* streamBackend; void* streamBackend;
int64_t streamBackendRid; int64_t streamBackendRid;
@ -629,6 +637,7 @@ typedef struct {
int32_t nodeId; int32_t nodeId;
SEpSet mgmtEps; SEpSet mgmtEps;
int32_t mnodeId; int32_t mnodeId;
int32_t transId;
int64_t expireTime; int64_t expireTime;
} SStreamCheckpointSourceReq; } SStreamCheckpointSourceReq;
@ -671,8 +680,8 @@ typedef struct STaskStatusEntry {
int64_t verStart; // start version in WAL, only valid for source task int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task int64_t processedVer; // only valid for source task
int32_t relatedHTask; // has related fill-history task
int64_t activeCheckpointId; // current active checkpoint id int64_t activeCheckpointId; // current active checkpoint id
int32_t chkpointTransId; // checkpoint trans id
bool checkpointFailed; // denote if the checkpoint is failed or not bool checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter; int64_t inputQUnchangeCounter;
@ -811,6 +820,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
@ -828,22 +838,21 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta // stream task meta
void streamMetaInit(); void streamMetaInit();
void streamMetaCleanup(); void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta); void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
@ -853,6 +862,11 @@ void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta); void streamMetaResetStartInfo(STaskStartInfo* pMeta);
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -90,7 +90,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0; code = 0;
_OVER: _OVER:

View File

@ -44,12 +44,11 @@ typedef struct SStreamTransMgmt {
} SStreamTransMgmt; } SStreamTransMgmt;
typedef struct SStreamExecInfo { typedef struct SStreamExecInfo {
SArray * pNodeList; SArray *pNodeList;
int64_t ts; // snapshot ts int64_t ts; // snapshot ts
SStreamTransMgmt transMgmt; SStreamTransMgmt transMgmt;
int64_t activeCheckpoint; // active check point id SHashObj *pTaskMap;
SHashObj * pTaskMap; SArray *pTaskList;
SArray * pTaskList;
TdThreadMutex lock; TdThreadMutex lock;
} SStreamExecInfo; } SStreamExecInfo;

View File

@ -62,7 +62,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId); int64_t streamId, int32_t taskId, int32_t transId);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode); static SArray *extractNodeListFromStream(SMnode *pMnode);
@ -685,6 +685,40 @@ _OVER:
return -1; return -1;
} }
static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool* hasEpset, int32_t taskId, int32_t nodeId) {
*hasEpset = false;
if (nodeId == SNODE_HANDLE) {
SSnodeObj *pObj = NULL;
void *pIter = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter != NULL) {
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
*hasEpset = true;
return TSDB_CODE_SUCCESS;
} else {
mError("failed to acquire snode epset");
return TSDB_CODE_INVALID_PARA;
}
} else {
SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
if (pVgObj != NULL) {
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
epsetAssign(pEpSet, &epset);
*hasEpset = true;
return TSDB_CODE_SUCCESS;
} else {
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
return TSDB_CODE_SUCCESS;
}
}
}
static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
@ -698,28 +732,17 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
STransAction action = {0}; STransAction action = {0};
SEpSet epset = {0}; SEpSet epset = {0};
if (pTask->info.nodeId == SNODE_HANDLE) { bool hasEpset = false;
SSnodeObj *pObj = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
sdbRelease(pMnode->pSdb, pObj); if (code != TSDB_CODE_SUCCESS) {
} terrno = code;
} else { return -1;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); }
if (pVgObj != NULL) {
epset = mndGetVgroupEpset(pMnode, pVgObj); // no valid epset, return directly without redoAction
mndReleaseVgroup(pMnode, pVgObj); if (!hasEpset) {
} else { return TSDB_CODE_SUCCESS;
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId);
taosMemoryFree(pReq);
return 0;
}
} }
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
@ -974,13 +997,14 @@ static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) {
} }
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId) { int64_t streamId, int32_t taskId, int32_t transId) {
SStreamCheckpointSourceReq req = {0}; SStreamCheckpointSourceReq req = {0};
req.checkpointId = checkpointId; req.checkpointId = checkpointId;
req.nodeId = nodeId; req.nodeId = nodeId;
req.expireTime = -1; req.expireTime = -1;
req.streamId = streamId; // pTask->id.streamId; req.streamId = streamId; // pTask->id.streamId;
req.taskId = taskId; // pTask->id.taskId; req.taskId = taskId; // pTask->id.taskId;
req.transId = transId;
int32_t code; int32_t code;
int32_t blen; int32_t blen;
@ -1070,7 +1094,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
void * buf; void * buf;
int32_t tlen; int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId) < 0) { pTask->id.taskId, pTrans->id) < 0) {
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
goto _ERR; goto _ERR;
@ -1139,7 +1163,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
void * buf; void * buf;
int32_t tlen; int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId, if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId) < 0) { pTask->id.taskId, pTrans->id) < 0) {
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
return -1; return -1;
@ -1776,9 +1800,20 @@ static int32_t mndPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *p
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); SEpSet epset;
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); bool hasEpset = false;
mndReleaseVgroup(pMnode, pVgObj); int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
// no valid epset, return directly without redoAction
if (!hasEpset) {
taosMemoryFree(pReq);
return TSDB_CODE_SUCCESS;
}
STransAction action = {0}; STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
@ -1920,17 +1955,25 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
static int32_t mndResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { static int32_t mndResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated; pReq->igUntreated = igUntreated;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); SEpSet epset;
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); bool hasEpset = false;
mndReleaseVgroup(pMnode, pVgObj); int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
STransAction action = {0}; STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
@ -2208,6 +2251,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
epsetAssign(&updateInfo.newEp, &pCurrent->epset); epsetAssign(&updateInfo.newEp, &pCurrent->epset);
taosArrayPush(info.pUpdateNodeList, &updateInfo); taosArrayPush(info.pUpdateNodeList, &updateInfo);
} }
// todo handle the snode info
if (pCurrent->nodeId != SNODE_HANDLE) { if (pCurrent->nodeId != SNODE_HANDLE) {
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
@ -2286,12 +2331,28 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
} }
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void * pIter = NULL; void *pIter = NULL;
STrans * pTrans = NULL; STrans *pTrans = NULL;
// conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
while(1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
sdbRelease(pSdb, pStream);
if (conflict) {
mWarn("nodeUpdate trans in progress, current nodeUpdate ignored");
sdbCancelFetch(pSdb, pIter);
return -1;
}
}
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2307,6 +2368,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return terrno; return terrno;
} }
mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
} }
void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
@ -2553,7 +2616,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
code = mndProcessVgroupChange(pMnode, &changeInfo); code = mndProcessVgroupChange(pMnode, &changeInfo);
// keep the new vnode snapshot // keep the new vnode snapshot if success
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("create trans successfully, update cached node list"); mDebug("create trans successfully, update cached node list");
taosArrayDestroy(execInfo.pNodeList); taosArrayDestroy(execInfo.pNodeList);
@ -2704,9 +2767,18 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); SEpSet epset;
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); bool hasEpset = false;
mndReleaseVgroup(pMnode, pVgObj); int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
continue;
}
if (!hasEpset) {
taosMemoryFree(pReq);
continue;
}
STransAction action = {0}; STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
@ -2766,39 +2838,36 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t le
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS;
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId); mInfo("kill checkpoint transId:%d to reset task status", transId);
mndKillTrans(pMnode, pTrans); mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire checkpoint trans:%d", transId);
} }
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level. SStreamObj *pStream = mndGetStreamObj(pMnode, streamId);
SSdb * pSdb = pMnode->pSdb; if (pStream == NULL) {
SStreamObj *pStream = NULL; code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
void * pIter = NULL; mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
while (1) { } else {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
if (conflict) { if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name, mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetDb); pStream->sourceDb, pStream->targetSTbName);
continue; } else {
} mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, transId);
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid); code = createStreamResetStatusTrans(pMnode, pStream);
int32_t code = createStreamResetStatusTrans(pMnode, pStream); mndReleaseStream(pMnode, pStream);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pSdb, pIter);
return code;
} }
} }
return 0;
return code;
} }
static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
@ -2878,11 +2947,17 @@ static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *p
mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId); mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); SEpSet epset;
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); bool hasEpset = false;
mndReleaseVgroup(pMnode, pVgObj); int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (hasEpset) {
tmsgSendReq(&epset, &msg);
}
tmsgSendReq(&epset, &msg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2930,6 +3005,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
bool checkpointFailed = false; bool checkpointFailed = false;
int64_t activeCheckpointId = 0; int64_t activeCheckpointId = 0;
int64_t streamId = 0;
int32_t transId = 0;
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen); tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@ -2972,7 +3049,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage); updateStageInfo(pTaskEntry, p->stage);
if (pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true; if (pTaskEntry->nodeId == SNODE_HANDLE) {
snodeChanged = true;
}
} else { } else {
// task is idle for more than 50 sec. // task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
@ -2996,6 +3075,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (p->checkpointFailed) { if (p->checkpointFailed) {
checkpointFailed = p->checkpointFailed; checkpointFailed = p->checkpointFailed;
streamId = p->id.streamId;
transId = p->chkpointTransId;
} }
} }
} }
@ -3034,9 +3115,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (allReady || snodeChanged) { if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId);
execInfo.activeCheckpoint); mndResetStatusFromCheckpoint(pMnode, streamId, transId);
mndResetStatusFromCheckpoint(pMnode, activeCheckpointId);
} else { } else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks"); mInfo("not all vgroups are ready, wait for next HB from stream tasks");
} }

View File

@ -90,16 +90,20 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) { if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) {
mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name); tInfo.name);
return true; return true;
} else {
mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName);
} }
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0)) { (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name); tInfo.name);
return true; return true;
} }
} else {
mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamUid);
} }
if (lock) { if (lock) {

View File

@ -125,7 +125,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
} }
pSnode->msgCb = pOption->msgCb; pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs()); pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
if (pSnode->pMeta == NULL) { if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
@ -147,8 +147,8 @@ FAIL:
} }
int32_t sndInit(SSnode *pSnode) { int32_t sndInit(SSnode *pSnode) {
resetStreamTaskStatus(pSnode->pMeta); tqStreamTaskResetStatus(pSnode->pMeta);
startStreamTasks(pSnode->pMeta); streamMetaStartAllTasks(pSnode->pMeta);
return 0; return 0;
} }
@ -202,6 +202,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
case TDMT_VND_STREAM_TASK_UPDATE: case TDMT_VND_STREAM_TASK_UPDATE:
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
case TDMT_VND_STREAM_TASK_RESET:
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg);
default: default:
ASSERT(0); ASSERT(0);
} }

View File

@ -152,9 +152,6 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);
char* tqOffsetBuildFName(const char* path, int32_t fVer); char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqStopStreamTasks(STQ* pTq);
// tq util // tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);

View File

@ -232,6 +232,7 @@ int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqScanWalAsync(STQ* pTq, bool ckPause); int tqScanWalAsync(STQ* pTq, bool ckPause);
int32_t tqStopStreamTasksAsync(STQ* pTq);
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);

View File

@ -240,23 +240,23 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) { static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
taosRLockLatch(&pMeta->lock); streamMetaRLock(pMeta);
SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask && *ppTask) { if (ppTask && *ppTask) {
pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer; pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer;
pItem->fetchResultVer = (*ppTask)->info.triggerParam; pItem->fetchResultVer = (*ppTask)->info.triggerParam;
} }
taosRUnLockLatch(&pMeta->lock); streamMetaRUnLock(pMeta);
} }
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
streamMetaUnregisterTask(pMeta, streamId, taskId); streamMetaUnregisterTask(pMeta, streamId, taskId);
taosWLockLatch(&pMeta->lock); streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
} }
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks); smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks);
} }
@ -1301,14 +1301,14 @@ _checkpoint:
checkpointBuilt = true; checkpointBuilt = true;
} }
taosWLockLatch(&pMeta->lock); streamMetaWLock(pMeta);
if (streamMetaSaveTask(pMeta, pTask)) { if (streamMetaSaveTask(pMeta, pTask)) {
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY; code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
taosHashCancelIterate(pInfoHash, infoHash); taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
", table:%" PRIi64 ", level:%d", ", table:%" PRIi64 ", level:%d",
TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1); TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
@ -1316,13 +1316,13 @@ _checkpoint:
} }
} }
if (pMeta) { if (pMeta) {
taosWLockLatch(&pMeta->lock); streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta)) { if (streamMetaCommit(pMeta)) {
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY; code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
} }
if (checkpointBuilt) { if (checkpointBuilt) {
smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId); smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);

View File

@ -96,7 +96,8 @@ int32_t tqInitialize(STQ* pTq) {
return -1; return -1;
} }
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1); int32_t vgId = TD_VID(pTq->pVnode);
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback);
if (pTq->pStreamMeta == NULL) { if (pTq->pStreamMeta == NULL) {
return -1; return -1;
} }
@ -1070,10 +1071,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
tqScanWal(pTq); tqScanWal(pTq);
return 0; return 0;
} }
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if(code == 0 && taskId > 0){ if(code == 0 && taskId > 0){
tqScanWalAsync(pTq, false); tqScanWalAsync(pTq, false);
} }
return code; return code;
} }
@ -1256,10 +1259,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (pTask->status.downstreamReady != 1) { if (pTask->status.downstreamReady != 1) {
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
pTask->chkInfo.checkpointingId = req.checkpointId; pTask->chkInfo.checkpointingId = req.checkpointId;
pTask->chkInfo.transId = req.transId;
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
", set it failure", ", transId:%d set it failed",
pTask->id.idStr, req.checkpointId); pTask->id.idStr, req.checkpointId, req.transId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
@ -1335,28 +1339,10 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont; return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
} }
// NOTE: here we may receive this message more than once, so need to handle this case
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
SVDropHTaskReq* pReq = (SVDropHTaskReq*)pMsg->pCont; SVDropHTaskReq* pReq = (SVDropHTaskReq*)pMsg->pCont;
@ -1375,14 +1361,17 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL); ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ASSERT(status == TASK_STATUS__STREAM_SCAN_HISTORY); if (status == TASK_STATUS__STREAM_SCAN_HISTORY) {
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); }
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
taosThreadMutexUnlock(&pTask->lock);
// clear the scheduler status // clear the scheduler status
streamTaskSetSchedStatusInactive(pTask); streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus); tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus);

View File

@ -109,8 +109,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return -1; return -1;
} }
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks, tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
alreadyRestored); numOfTasks, alreadyRestored);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
@ -123,33 +123,25 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return 0; return 0;
} }
int32_t tqStopStreamTasks(STQ* pTq) { int32_t tqStopStreamTasksAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d stop all %d stream task(s)", vgId, numOfTasks); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (numOfTasks == 0) { if (pRunReq == NULL) {
return TSDB_CODE_SUCCESS; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to stop tasks, code:%s", vgId, terrstr());
return -1;
} }
SArray* pTaskList = NULL; tqDebug("vgId:%d create msg to stop tasks", vgId);
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
streamMetaWUnLock(pMeta);
for (int32_t i = 0; i < numOfTasks; ++i) { pRunReq->head.vgId = vgId;
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); pRunReq->streamId = 0;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); pRunReq->taskId = STREAM_EXEC_STOP_ALL_TASKS_ID;
if (pTask == NULL) {
continue;
}
streamTaskStop(pTask); SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
streamMetaReleaseTask(pMeta, pTask); tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
}
taosArrayDestroy(pTaskList);
return 0; return 0;
} }

View File

@ -196,7 +196,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
taosWLockLatch(&pTq->pStreamMeta->lock); streamMetaWLock(pTq->pStreamMeta);
tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
if (rollback) { if (rollback) {
tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn); tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
@ -212,14 +212,14 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
goto _err; goto _err;
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); streamMetaWUnLock(pTq->pStreamMeta);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
_err: _err:
tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode), tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode),
tstrerror(code)); tstrerror(code));
taosWUnLockLatch(&pTq->pStreamMeta->lock); streamMetaWUnLock(pTq->pStreamMeta);
return code; return code;
} }
@ -240,13 +240,13 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
tDecoderClear(&decoder); tDecoderClear(&decoder);
int64_t key[2] = {taskId.streamId, taskId.taskId}; int64_t key[2] = {taskId.streamId, taskId.taskId};
taosWLockLatch(&pTq->pStreamMeta->lock); streamMetaWLock(pTq->pStreamMeta);
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) {
taosWUnLockLatch(&pTq->pStreamMeta->lock); streamMetaWUnLock(pTq->pStreamMeta);
return -1; return -1;
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); streamMetaWUnLock(pTq->pStreamMeta);
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
// do nothing // do nothing
} }

View File

@ -35,20 +35,8 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
} }
void tqUpdateNodeStage(STQ* pTq, bool isLeader) { void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
SSyncState state = syncGetState(pTq->pVnode->sync); SSyncState state = syncGetState(pTq->pVnode->sync);
SStreamMeta* pMeta = pTq->pStreamMeta; streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
int64_t stage = pMeta->stage;
pMeta->stage = state.term;
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
if (isLeader) {
tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
state.term, stage, isLeader);
streamMetaStartHb(pMeta);
} else {
tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d", pMeta->vgId, state.term, stage,
isLeader);
}
} }
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {

View File

@ -106,14 +106,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
return rsp.code; return rsp.code;
} }
streamMetaWUnLock(pMeta); // streamMetaWUnLock(pMeta);
// todo for test purpose
// the following two functions should not be executed within the scope of meta lock to avoid deadlock // the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask); streamTaskResetStatus(pTask);
// continue after lock the meta again // continue after lock the meta again
streamMetaWLock(pMeta); // streamMetaWLock(pMeta);
SStreamTask** ppHTask = NULL; SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -166,65 +167,17 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
} else { } else {
if (!restored) { if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
vgId);
pMeta->startInfo.tasksWillRestart = 0; pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
} else { } else {
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
#if 1 #if 0
// for test purpose, to trigger the leader election
taosMSleep(5000);
#endif
tqStreamTaskStartAsync(pMeta, cb, true); tqStreamTaskStartAsync(pMeta, cb, true);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
#else
streamMetaWUnLock(pMeta);
// For debug purpose.
// the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc.
while (1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
streamMetaInitBackend(pMeta);
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
resetStreamTaskStatus(pTq->pStreamMeta);
tqStartStreamTaskAsync(pTq, false);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
#endif
} }
} }
@ -645,65 +598,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
return 0; return 0;
} }
int32_t startStreamTasks(SStreamMeta* pMeta) { int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
// fill-history task can only be launched by related stream tasks.
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return code;
}
int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
@ -728,16 +623,18 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
int32_t code = 0; int32_t code = 0;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (1) { streamMetaWLock(pMeta);
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); if (pMeta->startInfo.taskStarting == 1) {
if (startVal == 0) { pMeta->startInfo.restartCount += 1;
break; tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
} pMeta->startInfo.restartCount);
streamMetaWUnLock(pMeta);
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); return TSDB_CODE_SUCCESS;
taosMsleep(500);
} }
pMeta->startInfo.taskStarting = 1;
streamMetaWUnLock(pMeta);
terrno = 0; terrno = 0;
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId); pMeta->updateInfo.transId);
@ -762,11 +659,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
} }
if (isLeader && !tsDisableStream) { if (isLeader && !tsDisableStream) {
resetStreamTaskStatus(pMeta); tqStreamTaskResetStatus(pMeta);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); streamMetaStartAllTasks(pMeta);
startStreamTasks(pMeta);
} else { } else {
streamMetaResetStartInfo(&pMeta->startInfo); streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -784,11 +679,14 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
startStreamTasks(pMeta); streamMetaStartAllTasks(pMeta);
return 0; return 0;
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
restartStreamTasks(pMeta, isLeader); restartStreamTasks(pMeta, isLeader);
return 0; return 0;
} else if (taskId == STREAM_EXEC_STOP_ALL_TASKS_ID) {
streamMetaStopAllTasks(pMeta);
return 0;
} }
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
@ -812,3 +710,46 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
return -1; return -1;
} }
} }
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
streamMetaWLock(pMeta);
if (pStartInfo->restartCount > 0) {
pStartInfo->restartCount -= 1;
ASSERT(pStartInfo->taskStarting == 0);
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", pMeta->vgId, pMeta->role,
pStartInfo->restartCount);
streamMetaWUnLock(pMeta);
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
} else {
streamMetaWUnLock(pMeta);
tqDebug("vgId:%d start all tasks completed", pMeta->vgId);
}
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -1360,7 +1360,8 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) { SVersionRange* pVerRange) {
int32_t step = ASCENDING_TRAVERSE(pSttBlockReader->order) ? 1 : -1; int32_t order = pSttBlockReader->order;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
while (1) { while (1) {
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree); bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
@ -1377,8 +1378,12 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
pSttBlockReader->currentKey = key; pSttBlockReader->currentKey = key;
pScanInfo->sttKeyInfo.nextProcKey = key; pScanInfo->sttKeyInfo.nextProcKey = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pSttBlockReader->order, if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
pVerRange)) { if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true;
}
} else {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true; return true;
} }
@ -2054,9 +2059,12 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
return false; return false;
} }
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pReader->info.order, if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) {
&pReader->info.verRange)) { bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver,
return false; pReader->info.order, &pReader->info.verRange);
if (dropped) {
return false;
}
} }
return true; return true;
@ -3219,7 +3227,7 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange) { SVersionRange* pVerRange) {
if (pDelList == NULL || (taosArrayGetSize(pDelList) == 0)) { if (pDelList == NULL || (TARRAY_SIZE(pDelList) == 0)) {
return false; return false;
} }
@ -3327,16 +3335,22 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
int32_t order = pReader->info.order;
if (outOfTimeWindow(key.ts, &pReader->info.window)) { if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false; pIter->hasVal = false;
return NULL; return NULL;
} }
// it is a valid data version // it is a valid data version
if ((key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) && if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) {
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->info.order, &pReader->info.verRange))) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow; return pRow;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange);
if (!dropped) {
return pRow;
}
}
} }
while (1) { while (1) {
@ -3353,9 +3367,15 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
return NULL; return NULL;
} }
if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer && if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) {
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->info.order, &pReader->info.verRange))) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow; return pRow;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange);
if (!dropped) {
return pRow;
}
}
} }
} }
} }

View File

@ -570,7 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
resetStreamTaskStatus(pVnode->pTq->pStreamMeta); tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
} }
} else { } else {
@ -594,7 +594,7 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
if (pVnode->pTq) { if (pVnode->pTq) {
tqUpdateNodeStage(pVnode->pTq, false); tqUpdateNodeStage(pVnode->pTq, false);
tqStopStreamTasks(pVnode->pTq); tqStopStreamTasksAsync(pVnode->pTq);
} }
} }

View File

@ -109,13 +109,12 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId); int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
void streamTaskSetCheckpointFailedId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask); STaskId streamTaskExtractKey(const SStreamTask* pTask);
@ -137,17 +136,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
// <<<<<<< HEAD
// void streamClearChkptReadyMsg(SStreamTask* pTask);
// int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const
// char*); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo*
// pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
// void streamMetaResetStartInfo(STaskStartInfo* pMeta);
// =======
// >>>>>>> 3.0
SStreamQueue* streamQueueOpen(int64_t cap); SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue); void streamQueueProcessSuccess(SStreamQueue* queue);

View File

@ -25,6 +25,7 @@ typedef struct {
SStreamTask* pTask; SStreamTask* pTask;
} SAsyncUploadArg; } SAsyncUploadArg;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -34,6 +35,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp
if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
@ -47,6 +49,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
} }
@ -111,6 +114,7 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1); return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1);
} }
// todo handle down the transId of checkpoint to sink/agg tasks.
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) { if (pChkpoint == NULL) {
@ -149,6 +153,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ. // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
pTask->chkInfo.transId = pReq->transId;
pTask->chkInfo.checkpointingId = pReq->checkpointId; pTask->chkInfo.checkpointingId = pReq->checkpointId;
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->chkInfo.startTs = taosGetTimestampMs();
@ -273,8 +278,9 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->chkInfo.failedId = 0; pTask->chkInfo.failedId = 0;
pTask->chkInfo.startTs = 0; // clear the recorded start time pTask->chkInfo.startTs = 0; // clear the recorded start time
pTask->chkInfo.checkpointNotReadyTasks = 0; pTask->chkInfo.checkpointNotReadyTasks = 0;
// pTask->chkInfo.checkpointAlignCnt = 0; pTask->chkInfo.transId = 0;
pTask->chkInfo.dispatchCheckpointTrigger = false; pTask->chkInfo.dispatchCheckpointTrigger = false;
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
if (clearChkpReadyMsg) { if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask); streamClearChkptReadyMsg(pTask);
@ -341,9 +347,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
return code; return code;
} }
void streamTaskSetFailedId(SStreamTask* pTask) { void streamTaskSetCheckpointFailedId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId;
} }
int32_t getChkpMeta(char* id, char* path, SArray* list) { int32_t getChkpMeta(char* id, char* path, SArray* list) {
@ -485,7 +490,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamTaskSetFailedId(pTask); streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr, stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr,
ckId); ckId);
} }

View File

@ -352,6 +352,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
return code; return code;
} }
// it's a new vnode to receive dispatch msg, so add one
if (pReqs[j].blockNum == 0) { if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1);
} }
@ -372,8 +373,15 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
pTask->msgInfo.pData = pReqs; pTask->msgInfo.pData = pReqs;
} }
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64, pTask->id.idStr, pTask->execInfo.dispatch, if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
pTask->pMeta->stage); stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " %p", pTask->id.idStr,
pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->msgInfo.pData);
} else {
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " dstVgNum:%d %p", pTask->id.idStr,
pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->outputInfo.shuffleDispatcher.waitingRspCnt,
pTask->msgInfo.pData);
}
return code; return code;
} }
@ -395,9 +403,11 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo); int32_t numOfVgroups = taosArrayGetSize(vgInfo);
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id, int32_t actualVgroups = pTask->outputInfo.shuffleDispatcher.waitingRspCnt;
pTask->info.selfChildId, numOfVgroups, msgId); stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d/%d vgroup(s), msgId:%d", id,
pTask->info.selfChildId, actualVgroups, numOfVgroups, msgId);
int32_t numOfSend = 0;
for (int32_t i = 0; i < numOfVgroups; i++) { for (int32_t i = 0; i < numOfVgroups; i++) {
if (pDispatchMsg[i].blockNum > 0) { if (pDispatchMsg[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
@ -408,6 +418,11 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
if (code < 0) { if (code < 0) {
break; break;
} }
// no need to try remain, all already send.
if (++numOfSend == actualVgroups) {
break;
}
} }
} }
@ -1081,6 +1096,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
// this message has been sent successfully, let's try next one. // this message has been sent successfully, let's try next one.
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);

View File

@ -266,11 +266,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
if (pBackend == NULL) { if (pBackend == NULL) {
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
taosMsleep(1000); taosMsleep(1000);
stDebug("backed holded by other task, restart later, path: %s, key: %s", pMeta->path, key); stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
} else { } else {
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
break; break;
} }
taosThreadMutexLock(&pMeta->backendMutex); taosThreadMutexLock(&pMeta->backendMutex);
pBackend = taskDbOpen(pMeta->path, key, chkpId); pBackend = taskDbOpen(pMeta->path, key, chkpId);
} }
@ -297,7 +298,9 @@ void streamMetaRemoveDB(void* arg, char* key) {
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
} }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
startComplete_fn_t fn) {
int32_t code = -1; int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
@ -363,20 +366,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->stage = stage; pMeta->stage = stage;
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
pMeta->startInfo.completeFn = fn;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
// pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
// pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
// while (pMeta->streamBackend == NULL) {
// qError("vgId:%d failed to init stream backend", pMeta->vgId);
// taosMsleep(2 * 1000);
// qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
// pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
// if (pMeta->streamBackend == NULL) {
// }
// }
// pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
pMeta->numOfPausedTasks = 0; pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0; pMeta->numOfStreamTasks = 0;
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
@ -384,6 +376,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->rid = taosAddRef(streamMetaId, pMeta); pMeta->rid = taosAddRef(streamMetaId, pMeta);
// set the attribute when running on Linux OS
#if defined LINUX
TdThreadRwlockAttr attr;
taosThreadRwlockAttrInit(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
taosThreadRwlockInit(&pMeta->lock, &attr);
taosThreadRwlockAttrDestroy(&attr);
#endif
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid); metaRefMgtAdd(pMeta->vgId, pRid);
@ -458,7 +461,6 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosRemoveRef(streamBackendId, pMeta->streamBackendRid); taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasksMap); taosHashClear(pMeta->pTasksMap);
taosHashClear(pMeta->pTaskDbUnique);
taosArrayClear(pMeta->pTaskList); taosArrayClear(pMeta->pTaskList);
taosArrayClear(pMeta->chkpSaved); taosArrayClear(pMeta->chkpSaved);
@ -617,22 +619,23 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
return (int32_t)size; return (int32_t)size;
} }
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId}; STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) { if (ppTask == NULL || streamTaskShouldStop(*ppTask)) {
if (!streamTaskShouldStop(*ppTask)) { return NULL;
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
streamMetaRUnLock(pMeta);
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
return *ppTask;
}
} }
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
return *ppTask;
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
streamMetaRLock(pMeta);
SStreamTask* p = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId);
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
return NULL; return p;
} }
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
@ -950,6 +953,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1;
} }
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
@ -988,6 +992,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1; if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1;
entry.id.taskId = taskId; entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry); taosArrayPush(pReq->pTaskStatus, &entry);
@ -1061,68 +1066,20 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
} }
void metaHbToMnode(void* param, void* tmrId) { static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
int64_t rid = *(int64_t*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
return;
}
// need to stop, stop now
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) {
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
return;
}
// not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
return;
}
// set the hb start time
if (pMeta->pHbInfo->hbStart == 0) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
SStreamHbMsg hbMsg = {0}; SStreamHbMsg hbMsg = {0};
SEpSet epset = {0}; SEpSet epset = {0};
bool hasMnodeEpset = false; bool hasMnodeEpset = false;
int64_t stage = 0; int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
streamMetaRLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
hbMsg.vgId = pMeta->vgId; hbMsg.vgId = pMeta->vgId;
stage = pMeta->stage;
SArray* pIdList = taosArrayDup(pMeta->pTaskList, NULL);
streamMetaRUnLock(pMeta);
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pIdList, i); STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
streamMetaRLock(pMeta);
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
streamMetaRUnLock(pMeta);
if (pTask == NULL) { if (pTask == NULL) {
continue; continue;
} }
@ -1136,7 +1093,7 @@ void metaHbToMnode(void* param, void* tmrId) {
.id = *pId, .id = *pId,
.status = streamTaskGetStatus(*pTask, NULL), .status = streamTaskGetStatus(*pTask, NULL),
.nodeId = hbMsg.vgId, .nodeId = hbMsg.vgId,
.stage = stage, .stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
}; };
@ -1149,6 +1106,11 @@ void metaHbToMnode(void* param, void* tmrId) {
if ((*pTask)->chkInfo.checkpointingId != 0) { if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId); entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId);
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId; entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId;
if (entry.checkpointFailed) {
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
}
} }
if ((*pTask)->exec.pWalReader != NULL) { if ((*pTask)->exec.pWalReader != NULL) {
@ -1191,30 +1153,70 @@ void metaHbToMnode(void* param, void* tmrId) {
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg msg = { SRpcMsg msg = {.info.noResp = 1};
.info.noResp = 1,
};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
pMeta->pHbInfo->hbCount += 1; pMeta->pHbInfo->hbCount += 1;
stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount); pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg); tmsgSendReq(&epset, &msg);
} else { } else {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
} }
_end: _end:
streamMetaClearHbMsg(&hbMsg); streamMetaClearHbMsg(&hbMsg);
taosArrayDestroy(pIdList); return TSDB_CODE_SUCCESS;
}
void metaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
return;
}
// need to stop, stop now
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) {
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
return;
}
// not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
return;
}
// set the hb start time
if (pMeta->pHbInfo->hbStart == 0) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
streamMetaRLock(pMeta);
metaHeartbeatToMnodeImpl(pMeta);
streamMetaRUnLock(pMeta);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
} }
bool streamMetaTaskInTimer(SStreamMeta* pMeta) { bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false; bool inTimer = false;
streamMetaWLock(pMeta); streamMetaRLock(pMeta);
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
@ -1225,11 +1227,12 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive >= 1) { if (pTask->status.timerActive >= 1) {
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart", pTask->id.idStr, pMeta->vgId);
inTimer = true; inTimer = true;
} }
} }
streamMetaWUnLock(pMeta); streamMetaRUnLock(pMeta);
return inTimer; return inTimer;
} }
@ -1288,26 +1291,37 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
taosHashClear(pStartInfo->pFailedTaskSet); taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0; pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0; pStartInfo->readyTs = 0;
// reset the sentinel flag value to be 0 // reset the sentinel flag value to be 0
atomic_store_32(&pStartInfo->taskStarting, 0); pStartInfo->taskStarting = 0;
} }
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-rlock", pMeta->vgId); stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosRLockLatch(&pMeta->lock); taosThreadRwlockRdlock(&pMeta->lock);
} }
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId); stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock); int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else {
stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
}
} }
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId); stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosWLockLatch(&pMeta->lock); taosThreadRwlockWrlock(&pMeta->lock);
stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
} }
void streamMetaWUnLock(SStreamMeta* pMeta) { void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId); stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock); taosThreadRwlockUnlock(&pMeta->lock);
} }
static void execHelper(struct SSchedMsg* pSchedMsg) { static void execHelper(struct SSchedMsg* pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle); int32_t code = execFn(pSchedMsg->thandle);
@ -1324,3 +1338,165 @@ int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, voi
schedMsg.msg = code; schedMsg.msg = code;
return taosScheduleTask(pMeta->qHandle, &schedMsg); return taosScheduleTask(pMeta->qHandle, &schedMsg);
} }
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
bool sendMsg = pMeta->sendMsgBeforeClosing;
if (!sendMsg) {
stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
return pTaskList;
}
stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
// send hb msg to mnode before closing all tasks.
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
taosThreadMutexLock(&pTask->lock);
char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId);
} else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, p);
}
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
}
metaHeartbeatToMnodeImpl(pMeta);
pMeta->sendMsgBeforeClosing = false;
return pTaskList;
}
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
streamMetaWLock(pMeta);
int64_t prevStage = pMeta->stage;
pMeta->stage = stage;
// mark the sign to send msg before close all tasks
if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) {
pMeta->sendMsgBeforeClosing = true;
}
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
streamMetaWUnLock(pMeta);
if (isLeader) {
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
prevStage, stage, isLeader);
streamMetaStartHb(pMeta);
} else {
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
}
}
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
streamMetaRLock(pMeta);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) {
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
// send hb msg to mnode before closing all tasks.
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
streamMetaRUnLock(pMeta);
return 0;
}
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
stInfo("vgId:%d start tasks completed", pMeta->vgId);
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
int64_t now = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false);
continue;
}
// fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, event);
code = ret;
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false);
}
streamMetaReleaseTask(pMeta, pTask);
}
stInfo("vgId:%d start tasks completed", pMeta->vgId);
taosArrayDestroy(pTaskList);
return code;
}

View File

@ -48,9 +48,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask); static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t streamTaskSetReady(SStreamTask* pTask) {
char* p = NULL; char* p = NULL;
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) && if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) &&
pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
@ -66,9 +66,6 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
int64_t el = (pTask->execInfo.start - pTask->execInfo.init); int64_t el = (pTask->execInfo.start - pTask->execInfo.init);
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p); pTask->id.idStr, numOfDowns, el, p);
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
pTask->execInfo.start, true);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -209,7 +206,11 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
int32_t numOfVgs = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs; pTask->notReadyTasks = numOfVgs;
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); if (pTask->checkReqIds == NULL) {
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
} else {
taosArrayClear(pTask->checkReqIds);
}
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
@ -312,9 +313,24 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
", prev:%" PRId64, ", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage); id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
} }
if (pInfo->stage != stage) { if (pInfo->stage != stage) {
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
return TASK_UPSTREAM_NEW_STAGE; return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) { } else if (pTask->status.downstreamReady != 1) {
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
@ -363,10 +379,11 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p); stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p);
streamTaskStartScanHistory(pTask); streamTaskStartScanHistory(pTask);
// start the related fill-history task, when current task is ready // NOTE: there will be an deadlock if launch fill history here.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { // // start the related fill-history task, when current task is ready
streamLaunchFillHistoryTask(pTask); // if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
} // streamLaunchFillHistoryTask(pTask);
// }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -380,6 +397,17 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
} }
streamTaskOnHandleEventSuccess(pTask->status.pSM, event); streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
int64_t initTs = pTask->execInfo.init;
int64_t startTs = pTask->execInfo.start;
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true);
// start the related fill-history task, when current task is ready
// not invoke in success callback due to the deadlock.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
} }
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
@ -436,8 +464,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);;
pTask->checkReqIds = NULL;
doProcessDownstreamReadyRsp(pTask); doProcessDownstreamReadyRsp(pTask);
} else { } else {
@ -458,8 +485,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", current stage:%" PRId64
", " ", not check wait for downstream task nodeUpdate, and all tasks restart",
"not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else { } else {
@ -1096,19 +1122,23 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64 stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs", ", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs, pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0); pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info // print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo); streamMetaResetStartInfo(pStartInfo);
streamMetaWUnLock(pMeta);
pStartInfo->completeFn(pMeta);
} else { } else {
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
ready, numOfRecv, numOfTotal);
} }
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -796,5 +796,6 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->sinkDataSize = pSrc->sinkDataSize; pDst->sinkDataSize = pSrc->sinkDataSize;
pDst->activeCheckpointId = pSrc->activeCheckpointId; pDst->activeCheckpointId = pSrc->activeCheckpointId;
pDst->checkpointFailed = pSrc->checkpointFailed; pDst->checkpointFailed = pSrc->checkpointFailed;
pDst->chkpointTransId = pSrc->chkpointTransId;
} }

View File

@ -72,7 +72,7 @@ TEST(osTest, osSystem) {
const int sysLen = 64; const int sysLen = 64;
char osSysName[sysLen]; char osSysName[sysLen];
int ret = taosGetOsReleaseName(osSysName, NULL, NULL, sysLen); int ret = taosGetOsReleaseName(osSysName, NULL, NULL, sysLen);
printf("os systeme name:%s\n", osSysName); printf("os system name:%s\n", osSysName);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
} }