set task status of ins_stream_tasks
This commit is contained in:
parent
2fc5eeb810
commit
863da2ae05
|
@ -1303,9 +1303,29 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
|
||||
// status
|
||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
char status2[20] = {0};
|
||||
strcpy(status, "normal");
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
|
||||
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
|
||||
if (taskStatus == TASK_STATUS__NORMAL) {
|
||||
memcpy(varDataVal(status), "normal", 6);
|
||||
varDataSetLen(status, 6);
|
||||
} else if (taskStatus == TASK_STATUS__DROPPING) {
|
||||
memcpy(varDataVal(status), "dropping", 8);
|
||||
varDataSetLen(status, 8);
|
||||
} else if (taskStatus == TASK_STATUS__FAIL) {
|
||||
memcpy(varDataVal(status), "fail", 4);
|
||||
varDataSetLen(status, 4);
|
||||
} else if (taskStatus == TASK_STATUS__STOP) {
|
||||
memcpy(varDataVal(status), "stop", 4);
|
||||
varDataSetLen(status, 4);
|
||||
} else if (taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||
memcpy(varDataVal(status), "history", 6);
|
||||
varDataSetLen(status, 6);
|
||||
} else if (taskStatus == TASK_STATUS__HALT) {
|
||||
memcpy(varDataVal(status), "halt", 4);
|
||||
varDataSetLen(status, 4);
|
||||
} else if (taskStatus == TASK_STATUS__PAUSE) {
|
||||
memcpy(varDataVal(status), "pause", 5);
|
||||
varDataSetLen(status, 5);
|
||||
}
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
|
||||
|
||||
|
@ -1358,6 +1378,11 @@ int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) {
|
|||
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
|
||||
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
|
||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
@ -1412,6 +1437,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pStream->status == STREAM_STATUS__PAUSE) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
|
@ -1492,6 +1521,10 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
|
|||
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
|
||||
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
|
||||
}
|
||||
}
|
||||
}
|
||||
// pStream->pHTasksList is null
|
||||
|
@ -1521,6 +1554,10 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pStream->status != STREAM_STATUS__PAUSE) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
|
|
Loading…
Reference in New Issue