Merge pull request #22024 from taosdata/fix/3_liaohj
fix(stream): fix concurrent bug in save task meta.
This commit is contained in:
commit
c388802bda
|
@ -346,6 +346,7 @@ struct SStreamTask {
|
||||||
int32_t refCnt;
|
int32_t refCnt;
|
||||||
int64_t checkpointingId;
|
int64_t checkpointingId;
|
||||||
int32_t checkpointAlignCnt;
|
int32_t checkpointAlignCnt;
|
||||||
|
int32_t transferStateAlignCnt;
|
||||||
struct SStreamMeta* pMeta;
|
struct SStreamMeta* pMeta;
|
||||||
SSHashObj* pNameMap;
|
SSHashObj* pNameMap;
|
||||||
};
|
};
|
||||||
|
@ -630,6 +631,8 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre
|
||||||
|
|
||||||
int32_t streamTaskReleaseState(SStreamTask* pTask);
|
int32_t streamTaskReleaseState(SStreamTask* pTask);
|
||||||
int32_t streamTaskReloadState(SStreamTask* pTask);
|
int32_t streamTaskReloadState(SStreamTask* pTask);
|
||||||
|
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -889,11 +889,11 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SSmaObj *pSma = NULL;
|
SSmaObj *pSma = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
|
@ -911,12 +911,18 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
||||||
if (pStream != NULL && pStream->smaId == pSma->uid) {
|
if (pStream != NULL && pStream->smaId == pSma->uid) {
|
||||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
||||||
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
|
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
|
||||||
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||||
|
|
|
@ -248,7 +248,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
|
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
|
||||||
|
|
|
@ -934,7 +934,6 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
};
|
};
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||||
|
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
rsp.status = streamTaskCheckStatus(pTask);
|
rsp.status = streamTaskCheckStatus(pTask);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
@ -1106,7 +1105,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
// todo handle error
|
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill history task:%s",
|
||||||
|
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||||
|
|
||||||
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
|
tqDebug("s-task:%s scan-history-task set status to be dropping", pId);
|
||||||
|
|
||||||
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
@ -1213,11 +1220,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify the downstream tasks to transfer executor state after handle all history blocks.
|
// notify the downstream tasks to transfer executor state after handle all history blocks.
|
||||||
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTransferReq req;
|
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
|
SStreamTransferReq req = {0};
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
tDecoderInit(&decoder, (uint8_t*)pReq, len);
|
||||||
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
|
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
@ -1227,25 +1237,33 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t remain = streamAlignTransferState(pTask);
|
||||||
|
if (remain > 0) {
|
||||||
|
tqDebug("s-task:%s receive transfer state msg, remain:%d", pTask->id.idStr, remain);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// transfer the ownership of executor state
|
// transfer the ownership of executor state
|
||||||
streamTaskReleaseState(pTask);
|
tqDebug("s-task:%s all upstream tasks end transfer msg", pTask->id.idStr);
|
||||||
tqDebug("s-task:%s receive state transfer req", pTask->id.idStr);
|
|
||||||
|
|
||||||
// related stream task load the state from the state storage backend
|
// related stream task load the state from the state storage backend
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
|
tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// when all upstream tasks have notified the this task to start transfer state, then we start the transfer procedure.
|
||||||
|
streamTaskReleaseState(pTask);
|
||||||
streamTaskReloadState(pStreamTask);
|
streamTaskReloadState(pStreamTask);
|
||||||
|
streamMetaReleaseTask(pTq->pStreamMeta, pStreamTask);
|
||||||
|
|
||||||
ASSERT(pTask->streamTaskId.taskId != 0);
|
ASSERT(pTask->streamTaskId.taskId != 0);
|
||||||
pTask->status.transferState = true;
|
pTask->status.transferState = true;
|
||||||
|
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -659,11 +659,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TRANSFER_STATE: {
|
case TDMT_STREAM_TRANSFER_STATE:
|
||||||
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg);
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
|
||||||
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len);
|
|
||||||
}
|
|
||||||
case TDMT_STREAM_SCAN_HISTORY_FINISH:
|
case TDMT_STREAM_SCAN_HISTORY_FINISH:
|
||||||
return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
|
return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
|
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||||
|
|
|
@ -288,9 +288,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SSubplan* pPlan = NULL;
|
SSubplan* pPlan = NULL;
|
||||||
|
int32_t code = qStringToSubplan(msg, &pPlan);
|
||||||
int32_t code = qStringToSubplan(msg, &pPlan);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -335,6 +334,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
qDestroyTask(pTaskInfo);
|
qDestroyTask(pTaskInfo);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -61,7 +61,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||||
return taosStrdup(buf);
|
return taosStrdup(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamSchedByTimer(void* param, void* tmrId) {
|
static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = (void*)param;
|
SStreamTask* pTask = (void*)param;
|
||||||
|
|
||||||
int8_t status = atomic_load_8(&pTask->triggerStatus);
|
int8_t status = atomic_load_8(&pTask->triggerStatus);
|
||||||
|
|
|
@ -352,11 +352,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed",
|
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed",
|
||||||
pTask->id.idStr, pTask->streamTaskId.taskId);
|
pTask->id.idStr, pTask->streamTaskId.taskId);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
|
qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
|
||||||
|
pStreamTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
||||||
|
@ -369,6 +370,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
|
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
|
||||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||||
|
qDebug("s-task:%s status: halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the stream task to be idle
|
// wait for the stream task to be idle
|
||||||
|
@ -477,6 +479,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
ASSERT(batchSize == 0);
|
ASSERT(batchSize == 0);
|
||||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||||
int32_t code = streamTransferStateToStreamTask(pTask);
|
int32_t code = streamTransferStateToStreamTask(pTask);
|
||||||
|
pTask->status.transferState = false; // reset this value, to avoid transfer state again
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -611,3 +614,13 @@ int32_t streamTaskReloadState(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamAlignTransferState(SStreamTask* pTask) {
|
||||||
|
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||||
|
int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
|
||||||
|
if (old == 0) {
|
||||||
|
qDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
|
||||||
|
}
|
||||||
|
|
||||||
|
return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1);
|
||||||
|
}
|
||||||
|
|
|
@ -264,8 +264,9 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||||
if (ppTask != NULL) {
|
if (ppTask != NULL) {
|
||||||
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
||||||
atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
|
||||||
return *ppTask;
|
return *ppTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -275,12 +276,24 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
||||||
if (left < 0) {
|
if (ref > 0) {
|
||||||
qError("task ref is invalid, ref:%d, %s", left, pTask->id.idStr);
|
qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
|
||||||
} else if (left == 0) {
|
} else if (ref == 0) {
|
||||||
ASSERT(streamTaskShouldStop(&pTask->status));
|
ASSERT(streamTaskShouldStop(&pTask->status));
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
} else if (ref < 0) {
|
||||||
|
qError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) {
|
||||||
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
|
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
if (*pTaskId == taskId) {
|
||||||
|
taosArrayRemove(pMeta->pTaskList, i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,17 +346,17 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
|
|
||||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||||
qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1);
|
qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
|
||||||
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
|
||||||
if (*pTaskId == taskId) {
|
// remove the ref by timer
|
||||||
taosArrayRemove(pMeta->pTaskList, i);
|
if (pTask->triggerParam != 0) {
|
||||||
break;
|
taosTmrStop(pTask->schedTimer);
|
||||||
}
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
|
@ -43,6 +43,7 @@ const char* streamGetTaskStatusStr(int32_t status) {
|
||||||
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
|
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
|
||||||
case TASK_STATUS__HALT: return "halt";
|
case TASK_STATUS__HALT: return "halt";
|
||||||
case TASK_STATUS__PAUSE: return "paused";
|
case TASK_STATUS__PAUSE: return "paused";
|
||||||
|
case TASK_STATUS__DROPPING: return "dropping";
|
||||||
default:return "";
|
default:return "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -205,7 +206,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
|
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
|
||||||
numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
|
numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
streamTaskLaunchScanHistory(pTask);
|
streamTaskLaunchScanHistory(pTask);
|
||||||
} else { // todo add assert, agg tasks?
|
} else {
|
||||||
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
|
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
|
||||||
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
|
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
|
||||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
@ -258,9 +259,15 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSetStatusNormal(SStreamTask* pTask) {
|
int32_t streamSetStatusNormal(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));
|
int32_t status = atomic_load_8(&pTask->status.taskStatus);
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
|
if (status == TASK_STATUS__DROPPING) {
|
||||||
return 0;
|
qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
||||||
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// source
|
// source
|
||||||
|
@ -344,7 +351,8 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
|
||||||
msg.info.noResp = 1;
|
msg.info.noResp = 1;
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
qDebug("s-task:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);
|
qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
|
||||||
|
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -354,9 +362,6 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr,
|
|
||||||
pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus));
|
|
||||||
|
|
||||||
req.taskId = pTask->fixedEpDispatcher.taskId;
|
req.taskId = pTask->fixedEpDispatcher.taskId;
|
||||||
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
@ -451,6 +456,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
|
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
|
||||||
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
|
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
|
||||||
|
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
(*ppTask)->status.timerActive = 0;
|
(*ppTask)->status.timerActive = 0;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
return;
|
return;
|
||||||
|
@ -511,6 +517,7 @@ int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) {
|
||||||
pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer);
|
pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer);
|
||||||
if (pTask->launchTaskTimer == NULL) {
|
if (pTask->launchTaskTimer == NULL) {
|
||||||
// todo failed to create timer
|
// todo failed to create timer
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
} else {
|
} else {
|
||||||
pTask->status.timerActive = 1; // timer is active
|
pTask->status.timerActive = 1; // timer is active
|
||||||
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
||||||
|
@ -553,8 +560,10 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
streamSetStatusNormal(pTask);
|
streamSetStatusNormal(pTask);
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
||||||
// todo check rsp, commit data
|
taosWLockLatch(&pMeta->lock);
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,13 +205,16 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
|
|
||||||
void tFreeStreamTask(SStreamTask* pTask) {
|
void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
qDebug("free s-task:%s", pTask->id.idStr);
|
qDebug("free s-task:%s", pTask->id.idStr);
|
||||||
|
|
||||||
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
||||||
if (pTask->inputQueue) {
|
if (pTask->inputQueue) {
|
||||||
streamQueueClose(pTask->inputQueue);
|
streamQueueClose(pTask->inputQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->outputQueue) {
|
if (pTask->outputQueue) {
|
||||||
streamQueueClose(pTask->outputQueue);
|
streamQueueClose(pTask->outputQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->exec.qmsg) {
|
if (pTask->exec.qmsg) {
|
||||||
taosMemoryFree(pTask->exec.qmsg);
|
taosMemoryFree(pTask->exec.qmsg);
|
||||||
}
|
}
|
||||||
|
@ -230,9 +233,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
||||||
taosMemoryFree(pTask->tbSink.pTSchema);
|
taosMemoryFree(pTask->tbSink.pTSchema);
|
||||||
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
|
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
|
||||||
}
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
taosArrayDestroy(pTask->checkReqIds);
|
taosArrayDestroy(pTask->checkReqIds);
|
||||||
pTask->checkReqIds = NULL;
|
pTask->checkReqIds = NULL;
|
||||||
|
|
Loading…
Reference in New Issue