Merge pull request #22681 from taosdata/fix/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
5dcbae3594
|
@ -41,16 +41,16 @@ enum {
|
|||
STREAM_STATUS__PAUSE,
|
||||
};
|
||||
|
||||
enum {
|
||||
typedef enum ETaskStatus {
|
||||
TASK_STATUS__NORMAL = 0,
|
||||
TASK_STATUS__DROPPING,
|
||||
TASK_STATUS__FAIL,
|
||||
TASK_STATUS__UNINIT, // not used, an placeholder
|
||||
TASK_STATUS__STOP,
|
||||
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
||||
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
||||
TASK_STATUS__PAUSE, // pause
|
||||
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
|
||||
};
|
||||
} ETaskStatus;
|
||||
|
||||
enum {
|
||||
TASK_SCHED_STATUS__INACTIVE = 1,
|
||||
|
|
|
@ -868,6 +868,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
mndTransDrop(pTrans);
|
||||
|
||||
taosThreadMutexLock(&execNodeList.lock);
|
||||
mDebug("register to stream task node list");
|
||||
keepStreamTasksInBuf(&streamObj, &execNodeList);
|
||||
taosThreadMutexUnlock(&execNodeList.lock);
|
||||
|
||||
|
@ -876,8 +877,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
char detail[2000] = {0};
|
||||
sprintf(detail,
|
||||
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
|
||||
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64
|
||||
", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
|
||||
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64
|
||||
", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
|
||||
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
|
||||
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
|
||||
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
|
||||
|
@ -1574,8 +1575,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
} else if (taskStatus == TASK_STATUS__DROPPING) {
|
||||
memcpy(varDataVal(status), "dropping", 8);
|
||||
varDataSetLen(status, 8);
|
||||
} else if (taskStatus == TASK_STATUS__FAIL) {
|
||||
memcpy(varDataVal(status), "fail", 4);
|
||||
} else if (taskStatus == TASK_STATUS__UNINIT) {
|
||||
memcpy(varDataVal(status), "uninit", 6);
|
||||
varDataSetLen(status, 4);
|
||||
} else if (taskStatus == TASK_STATUS__STOP) {
|
||||
memcpy(varDataVal(status), "stop", 4);
|
||||
|
@ -2016,14 +2017,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
|
|||
|
||||
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
|
||||
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
|
||||
const SEp* p = GET_ACTIVE_EP(pCurrent);
|
||||
|
||||
for (int32_t i = 0; i < pCurrent->numOfEps; ++i) {
|
||||
const SEp *p = &(pCurrent->eps[i]);
|
||||
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
|
||||
return false;
|
||||
}
|
||||
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2120,6 +2118,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
|
||||
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -2223,18 +2222,22 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
||||
|
||||
// keep the new vnode snapshot
|
||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mDebug("create trans successfully, update cached node list");
|
||||
taosArrayDestroy(execNodeList.pNodeEntryList);
|
||||
execNodeList.pNodeEntryList = pNodeSnapshot;
|
||||
execNodeList.ts = ts;
|
||||
}
|
||||
} else {
|
||||
mDebug("no update found in nodeList");
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
}
|
||||
|
||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||
taosHashCleanup(changeInfo.pDBMap);
|
||||
|
||||
// keep the new vnode snapshot
|
||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
taosArrayDestroy(execNodeList.pNodeEntryList);
|
||||
execNodeList.pNodeEntryList = pNodeSnapshot;
|
||||
execNodeList.ts = ts;
|
||||
}
|
||||
|
||||
mDebug("end to do stream task node change checking");
|
||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||
return 0;
|
||||
|
@ -2284,7 +2287,6 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
|
|||
// todo: this process should be executed by the write queue worker of the mnode
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
||||
SStreamHbMsg req = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -2309,8 +2311,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
|
||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||
int64_t k[2] = {p->streamId, p->taskId};
|
||||
|
||||
int64_t k[2] = {p->streamId, p->taskId};
|
||||
int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
||||
if (index == NULL) {
|
||||
continue;
|
||||
|
|
|
@ -165,6 +165,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
|||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
int32_t tqScanWal(STQ* pTq);
|
||||
int32_t tqCheckAndRunStreamTask(STQ* pTq);
|
||||
int32_t tqStartStreamTasks(STQ* pTq);
|
||||
int32_t tqStopStreamTasks(STQ* pTq);
|
||||
|
||||
// tq util
|
||||
|
|
|
@ -1416,7 +1416,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
}
|
||||
|
||||
int8_t status = pTask->status.taskStatus;
|
||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
|
||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
|
||||
// no lock needs to secure the access of the version
|
||||
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
|
||||
// discard all the data when the stream task is suspended.
|
||||
|
@ -1700,20 +1700,47 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||
streamSetStatusNormal(pTask);
|
||||
|
||||
SStreamTask** ppHTask = NULL;
|
||||
if (pTask->historyTaskId.taskId != 0) {
|
||||
keys[0] = pTask->historyTaskId.streamId;
|
||||
keys[1] = pTask->historyTaskId.taskId;
|
||||
|
||||
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
||||
pMeta->vgId, req.taskId);
|
||||
} else {
|
||||
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
streamSetStatusNormal(pTask);
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
if (ppHTask != NULL) {
|
||||
streamMetaSaveTask(pMeta, *ppHTask);
|
||||
}
|
||||
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
}
|
||||
|
||||
streamTaskStop(pTask);
|
||||
if (ppHTask != NULL) {
|
||||
streamTaskStop(*ppHTask);
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
|
||||
|
||||
pMeta->closedTask += 1;
|
||||
if (ppHTask != NULL) {
|
||||
pMeta->closedTask += 1;
|
||||
}
|
||||
|
||||
// possibly only handle the stream task.
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
bool allStopped = (pMeta->closedTask == numOfTasks);
|
||||
if (allStopped) {
|
||||
|
@ -1752,6 +1779,7 @@ _end:
|
|||
taosWUnLockLatch(&pMeta->lock);
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
vInfo("vgId:%d, restart all stream tasks", vgId);
|
||||
tqStartStreamTasks(pTq);
|
||||
tqCheckAndRunStreamTaskAsync(pTq);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -224,6 +224,35 @@ int32_t tqStopStreamTasks(STQ* pTq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqStartStreamTasks(STQ* pTq) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
|
||||
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks);
|
||||
|
||||
if (numOfTasks == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||
|
||||
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
||||
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
|
||||
|
||||
int8_t status = (*pTask)->status.taskStatus;
|
||||
if (status == TASK_STATUS__STOP) {
|
||||
streamSetStatusNormal(*pTask);
|
||||
}
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
||||
// seek the stored version and extract data from WAL
|
||||
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
||||
|
|
|
@ -560,6 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
|
||||
} else {
|
||||
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
|
||||
tqStartStreamTasks(pVnode->pTq);
|
||||
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
SStreamTask* pTask = param;
|
||||
|
||||
if (streamTaskShouldStop(&pTask->status)) {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
}
|
||||
} else {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||
int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||
}
|
||||
} else {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref);
|
||||
}
|
||||
}
|
||||
|
||||
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
|
||||
qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration);
|
||||
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration);
|
||||
if (pTask->launchTaskTimer != NULL) {
|
||||
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
} else {
|
||||
pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
||||
|
@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
|
||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
|
||||
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||
|
||||
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
break;
|
||||
}
|
||||
|
@ -995,8 +1002,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
|
||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
|
||||
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d",
|
||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else { // pipeline send data in output queue
|
||||
// this message has been sent successfully, let's try next one.
|
||||
|
|
|
@ -666,6 +666,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||
if (p == NULL) {
|
||||
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader
|
||||
// In this case, we try not to start fill-history task anymore.
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
|
||||
doClear(pKey, pVal, pCur, pRecycleList);
|
||||
tFreeStreamTask(pTask);
|
||||
|
|
|
@ -235,7 +235,13 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
|
|||
qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
|
||||
streamTaskLaunchScanHistory(pTask);
|
||||
} else {
|
||||
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
qDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
|
||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||
ASSERT(pTask->historyTaskId.taskId == 0);
|
||||
} else {
|
||||
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
|
||||
}
|
||||
}
|
||||
|
||||
// when current stream task is ready, check the related fill history task.
|
||||
|
@ -579,19 +585,17 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
// todo fix the bug: 2. race condition
|
||||
// an fill history task needs to be started.
|
||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||
int32_t tId = pTask->historyTaskId.taskId;
|
||||
if (tId == 0) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
int32_t hTaskId = pTask->historyTaskId.taskId;
|
||||
if (hTaskId == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ASSERT(pTask->status.downstreamReady == 1);
|
||||
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
|
||||
pTask->historyTaskId.streamId, tId);
|
||||
pTask->historyTaskId.streamId, hTaskId);
|
||||
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
int32_t hTaskId = pTask->historyTaskId.taskId;
|
||||
|
||||
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
|
||||
int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
|
||||
|
||||
// Set the execute conditions, including the query time window and the version range
|
||||
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||
|
@ -610,11 +614,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
// todo failed to create timer
|
||||
taosMemoryFree(pInfo);
|
||||
} else {
|
||||
atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
|
||||
int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
|
||||
ASSERT(ref == 1);
|
||||
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
||||
}
|
||||
} else { // timer exists
|
||||
ASSERT(pTask->status.timerActive > 0);
|
||||
ASSERT(pTask->status.timerActive == 1);
|
||||
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
|
||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
}
|
||||
|
@ -918,6 +923,13 @@ void streamTaskHalt(SStreamTask* pTask) {
|
|||
return;
|
||||
}
|
||||
|
||||
// wait for checkpoint completed
|
||||
while(pTask->status.taskStatus == TASK_STATUS__CK) {
|
||||
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr,
|
||||
streamGetTaskStatusStr(TASK_STATUS__CK));
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
// upgrade to halt status
|
||||
if (status == TASK_STATUS__PAUSE) {
|
||||
qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
|
||||
|
|
Loading…
Reference in New Issue