Merge pull request #22737 from taosdata/fix/3_liaohj
fix(stream) : fix error in timer Active counter
This commit is contained in:
commit
097e2b9b10
|
@ -41,16 +41,16 @@ enum {
|
||||||
STREAM_STATUS__PAUSE,
|
STREAM_STATUS__PAUSE,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
typedef enum ETaskStatus {
|
||||||
TASK_STATUS__NORMAL = 0,
|
TASK_STATUS__NORMAL = 0,
|
||||||
TASK_STATUS__DROPPING,
|
TASK_STATUS__DROPPING,
|
||||||
TASK_STATUS__FAIL,
|
TASK_STATUS__UNINIT, // not used, an placeholder
|
||||||
TASK_STATUS__STOP,
|
TASK_STATUS__STOP,
|
||||||
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
||||||
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
||||||
TASK_STATUS__PAUSE, // pause
|
TASK_STATUS__PAUSE, // pause
|
||||||
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
|
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
|
||||||
};
|
} ETaskStatus;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_SCHED_STATUS__INACTIVE = 1,
|
TASK_SCHED_STATUS__INACTIVE = 1,
|
||||||
|
|
|
@ -868,6 +868,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
taosThreadMutexLock(&execNodeList.lock);
|
taosThreadMutexLock(&execNodeList.lock);
|
||||||
|
mDebug("register to stream task node list");
|
||||||
keepStreamTasksInBuf(&streamObj, &execNodeList);
|
keepStreamTasksInBuf(&streamObj, &execNodeList);
|
||||||
taosThreadMutexUnlock(&execNodeList.lock);
|
taosThreadMutexUnlock(&execNodeList.lock);
|
||||||
|
|
||||||
|
@ -876,8 +877,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
char detail[2000] = {0};
|
char detail[2000] = {0};
|
||||||
sprintf(detail,
|
sprintf(detail,
|
||||||
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
|
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
|
||||||
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64
|
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64
|
||||||
", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
|
", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
|
||||||
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
|
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
|
||||||
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
|
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
|
||||||
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
|
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
|
||||||
|
@ -1574,8 +1575,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
} else if (taskStatus == TASK_STATUS__DROPPING) {
|
} else if (taskStatus == TASK_STATUS__DROPPING) {
|
||||||
memcpy(varDataVal(status), "dropping", 8);
|
memcpy(varDataVal(status), "dropping", 8);
|
||||||
varDataSetLen(status, 8);
|
varDataSetLen(status, 8);
|
||||||
} else if (taskStatus == TASK_STATUS__FAIL) {
|
} else if (taskStatus == TASK_STATUS__UNINIT) {
|
||||||
memcpy(varDataVal(status), "fail", 4);
|
memcpy(varDataVal(status), "uninit", 6);
|
||||||
varDataSetLen(status, 4);
|
varDataSetLen(status, 4);
|
||||||
} else if (taskStatus == TASK_STATUS__STOP) {
|
} else if (taskStatus == TASK_STATUS__STOP) {
|
||||||
memcpy(varDataVal(status), "stop", 4);
|
memcpy(varDataVal(status), "stop", 4);
|
||||||
|
@ -2016,14 +2017,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
|
||||||
|
|
||||||
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
|
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
|
||||||
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
|
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
|
||||||
|
const SEp* p = GET_ACTIVE_EP(pCurrent);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCurrent->numOfEps; ++i) {
|
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
|
||||||
const SEp *p = &(pCurrent->eps[i]);
|
return false;
|
||||||
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2120,6 +2118,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
|
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
|
||||||
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
|
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2223,18 +2222,22 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
||||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||||
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
||||||
|
|
||||||
|
// keep the new vnode snapshot
|
||||||
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
mDebug("create trans successfully, update cached node list");
|
||||||
|
taosArrayDestroy(execNodeList.pNodeEntryList);
|
||||||
|
execNodeList.pNodeEntryList = pNodeSnapshot;
|
||||||
|
execNodeList.ts = ts;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
mDebug("no update found in nodeList");
|
||||||
|
taosArrayDestroy(pNodeSnapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||||
taosHashCleanup(changeInfo.pDBMap);
|
taosHashCleanup(changeInfo.pDBMap);
|
||||||
|
|
||||||
// keep the new vnode snapshot
|
|
||||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
|
||||||
taosArrayDestroy(execNodeList.pNodeEntryList);
|
|
||||||
execNodeList.pNodeEntryList = pNodeSnapshot;
|
|
||||||
execNodeList.ts = ts;
|
|
||||||
}
|
|
||||||
|
|
||||||
mDebug("end to do stream task node change checking");
|
mDebug("end to do stream task node change checking");
|
||||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2284,7 +2287,6 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
|
||||||
// todo: this process should be executed by the write queue worker of the mnode
|
// todo: this process should be executed by the write queue worker of the mnode
|
||||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
|
||||||
SStreamHbMsg req = {0};
|
SStreamHbMsg req = {0};
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -2309,8 +2311,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||||
int64_t k[2] = {p->streamId, p->taskId};
|
int64_t k[2] = {p->streamId, p->taskId};
|
||||||
|
|
||||||
int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
||||||
if (index == NULL) {
|
if (index == NULL) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -165,6 +165,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
int32_t tqScanWal(STQ* pTq);
|
int32_t tqScanWal(STQ* pTq);
|
||||||
int32_t tqCheckAndRunStreamTask(STQ* pTq);
|
int32_t tqCheckAndRunStreamTask(STQ* pTq);
|
||||||
|
int32_t tqStartStreamTasks(STQ* pTq);
|
||||||
int32_t tqStopStreamTasks(STQ* pTq);
|
int32_t tqStopStreamTasks(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
|
|
|
@ -1416,7 +1416,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t status = pTask->status.taskStatus;
|
||||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
|
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
|
||||||
// no lock needs to secure the access of the version
|
// no lock needs to secure the access of the version
|
||||||
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
|
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
|
||||||
// discard all the data when the stream task is suspended.
|
// discard all the data when the stream task is suspended.
|
||||||
|
@ -1700,20 +1700,47 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
||||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
|
streamSetStatusNormal(pTask);
|
||||||
|
|
||||||
|
SStreamTask** ppHTask = NULL;
|
||||||
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
|
keys[0] = pTask->historyTaskId.streamId;
|
||||||
|
keys[1] = pTask->historyTaskId.taskId;
|
||||||
|
|
||||||
|
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
|
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||||
|
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
||||||
|
pMeta->vgId, req.taskId);
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||||
|
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
streamSetStatusNormal(pTask);
|
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
if (ppHTask != NULL) {
|
||||||
|
streamMetaSaveTask(pMeta, *ppHTask);
|
||||||
|
}
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskStop(pTask);
|
streamTaskStop(pTask);
|
||||||
|
if (ppHTask != NULL) {
|
||||||
|
streamTaskStop(*ppHTask);
|
||||||
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
|
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
|
||||||
|
|
||||||
pMeta->closedTask += 1;
|
pMeta->closedTask += 1;
|
||||||
|
if (ppHTask != NULL) {
|
||||||
|
pMeta->closedTask += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// possibly only handle the stream task.
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
bool allStopped = (pMeta->closedTask == numOfTasks);
|
bool allStopped = (pMeta->closedTask == numOfTasks);
|
||||||
if (allStopped) {
|
if (allStopped) {
|
||||||
|
@ -1752,6 +1779,7 @@ _end:
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
vInfo("vgId:%d, restart all stream tasks", vgId);
|
vInfo("vgId:%d, restart all stream tasks", vgId);
|
||||||
|
tqStartStreamTasks(pTq);
|
||||||
tqCheckAndRunStreamTaskAsync(pTq);
|
tqCheckAndRunStreamTaskAsync(pTq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,6 +224,35 @@ int32_t tqStopStreamTasks(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
|
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks);
|
||||||
|
|
||||||
|
if (numOfTasks == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
|
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
||||||
|
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
|
||||||
|
|
||||||
|
int8_t status = (*pTask)->status.taskStatus;
|
||||||
|
if (status == TASK_STATUS__STOP) {
|
||||||
|
streamSetStatusNormal(*pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
||||||
// seek the stored version and extract data from WAL
|
// seek the stored version and extract data from WAL
|
||||||
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
||||||
|
|
|
@ -560,6 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
|
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
|
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
|
||||||
|
tqStartStreamTasks(pVnode->pTq);
|
||||||
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
|
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = param;
|
SStreamTask* pTask = param;
|
||||||
|
|
||||||
if (streamTaskShouldStop(&pTask->status)) {
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||||
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||||
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||||
|
qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
|
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
|
||||||
qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration);
|
qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration);
|
||||||
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
|
if (pTask->launchTaskTimer != NULL) {
|
||||||
|
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
|
||||||
|
} else {
|
||||||
|
pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
||||||
|
@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||||
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
|
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
|
|
||||||
|
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||||
|
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -995,8 +1002,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
|
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
|
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d",
|
||||||
|
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
} else { // pipeline send data in output queue
|
} else { // pipeline send data in output queue
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
|
|
|
@ -666,6 +666,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||||
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
|
||||||
|
// In this case, we try not to start fill-history task anymore.
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
|
||||||
doClear(pKey, pVal, pCur, pRecycleList);
|
doClear(pKey, pVal, pCur, pRecycleList);
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
|
|
@ -235,7 +235,13 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
|
||||||
qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
|
qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
|
||||||
streamTaskLaunchScanHistory(pTask);
|
streamTaskLaunchScanHistory(pTask);
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
qDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
|
||||||
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
|
ASSERT(pTask->historyTaskId.taskId == 0);
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// when current stream task is ready, check the related fill history task.
|
// when current stream task is ready, check the related fill history task.
|
||||||
|
@ -579,19 +585,17 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
// todo fix the bug: 2. race condition
|
// todo fix the bug: 2. race condition
|
||||||
// an fill history task needs to be started.
|
// an fill history task needs to be started.
|
||||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
int32_t tId = pTask->historyTaskId.taskId;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
if (tId == 0) {
|
int32_t hTaskId = pTask->historyTaskId.taskId;
|
||||||
|
if (hTaskId == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 1);
|
ASSERT(pTask->status.downstreamReady == 1);
|
||||||
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
|
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
|
||||||
pTask->historyTaskId.streamId, tId);
|
pTask->historyTaskId.streamId, hTaskId);
|
||||||
|
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
|
||||||
int32_t hTaskId = pTask->historyTaskId.taskId;
|
|
||||||
|
|
||||||
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
|
|
||||||
|
|
||||||
// Set the execute conditions, including the query time window and the version range
|
// Set the execute conditions, including the query time window and the version range
|
||||||
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
|
@ -610,11 +614,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
// todo failed to create timer
|
// todo failed to create timer
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
} else {
|
} else {
|
||||||
atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
|
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
|
||||||
|
ASSERT(ref == 1);
|
||||||
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
} else { // timer exists
|
} else { // timer exists
|
||||||
ASSERT(pTask->status.timerActive > 0);
|
ASSERT(pTask->status.timerActive == 1);
|
||||||
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
|
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
|
||||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
||||||
}
|
}
|
||||||
|
@ -918,6 +923,13 @@ void streamTaskHalt(SStreamTask* pTask) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait for checkpoint completed
|
||||||
|
while(pTask->status.taskStatus == TASK_STATUS__CK) {
|
||||||
|
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr,
|
||||||
|
streamGetTaskStatusStr(TASK_STATUS__CK));
|
||||||
|
taosMsleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
// upgrade to halt status
|
// upgrade to halt status
|
||||||
if (status == TASK_STATUS__PAUSE) {
|
if (status == TASK_STATUS__PAUSE) {
|
||||||
qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
|
qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
|
||||||
|
|
Loading…
Reference in New Issue