Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-21898
This commit is contained in:
commit
31ad73f6d1
|
@ -762,7 +762,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
||||||
void streamTaskResume(SStreamTask* pTask);
|
void streamTaskResume(SStreamTask* pTask);
|
||||||
void streamTaskDisablePause(SStreamTask* pTask);
|
|
||||||
void streamTaskEnablePause(SStreamTask* pTask);
|
void streamTaskEnablePause(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
||||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||||
|
@ -807,6 +806,10 @@ void streamMetaStartHb(SStreamMeta* pMeta);
|
||||||
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
|
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
|
||||||
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
|
void streamMetaWLock(SStreamMeta* pMeta);
|
||||||
|
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
|
@ -819,6 +819,8 @@ int32_t* taosGetErrno();
|
||||||
// stream
|
// stream
|
||||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||||
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
|
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
|
||||||
|
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
|
||||||
|
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||||
|
|
|
@ -165,17 +165,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
|
||||||
|
|
||||||
// 2.save task
|
// 2.save task
|
||||||
taosWLockLatch(&pSnode->pMeta->lock);
|
streamMetaWLock(pSnode->pMeta);
|
||||||
|
|
||||||
bool added = false;
|
bool added = false;
|
||||||
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added);
|
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
streamMetaWUnLock(pSnode->pMeta);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
||||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
streamMetaWUnLock(pSnode->pMeta);
|
||||||
|
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
|
@ -195,14 +195,14 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
// commit the update
|
// commit the update
|
||||||
taosWLockLatch(&pSnode->pMeta->lock);
|
streamMetaWLock(pSnode->pMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
||||||
qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
||||||
|
|
||||||
if (streamMetaCommit(pSnode->pMeta) < 0) {
|
if (streamMetaCommit(pSnode->pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
streamMetaWUnLock(pSnode->pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1023,10 +1023,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
int64_t streamId = pTask->id.streamId;
|
int64_t streamId = pTask->id.streamId;
|
||||||
bool added = false;
|
bool added = false;
|
||||||
|
|
||||||
taosWLockLatch(&pStreamMeta->lock);
|
streamMetaWLock(pStreamMeta);
|
||||||
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
|
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
|
||||||
taosWUnLockLatch(&pStreamMeta->lock);
|
streamMetaWUnLock(pStreamMeta);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
||||||
|
@ -1064,6 +1064,47 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
||||||
|
|
||||||
|
// if it's an source task, extract the last version in wal.
|
||||||
|
SVersionRange *pRange = &pTask->dataRange.range;
|
||||||
|
|
||||||
|
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
||||||
|
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||||
|
|
||||||
|
if (done) {
|
||||||
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
||||||
|
streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
|
streamExecTask(pTask); // exec directly
|
||||||
|
} else {
|
||||||
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
|
", do secondary scan-history from WAL after halt the related stream task:%s",
|
||||||
|
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
||||||
|
pStreamTask->id.idStr);
|
||||||
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
|
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||||
|
|
||||||
|
int64_t dstVer = pTask->dataRange.range.minVer;
|
||||||
|
pTask->chkInfo.nextProcessVer = dstVer;
|
||||||
|
|
||||||
|
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||||
|
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||||
|
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
||||||
|
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
|
||||||
|
// now the fill-history task starts to scan data from wal files.
|
||||||
|
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
tqScanWalAsync(pTq, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// this function should be executed by only one thread
|
// this function should be executed by only one thread
|
||||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
||||||
|
@ -1126,10 +1167,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
|
||||||
ASSERT(pTask->status.pauseAllowed == true);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamScanHistoryData(pTask);
|
streamScanHistoryData(pTask);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||||
|
@ -1146,18 +1183,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
|
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
SVersionRange* pRange = NULL;
|
|
||||||
SStreamTask* pStreamTask = NULL;
|
SStreamTask* pStreamTask = NULL;
|
||||||
|
|
||||||
// 1. get the related stream task
|
// 1. get the related stream task
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
// todo delete this task, if the related stream task is dropped
|
tqError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop related fill-history task:%s",
|
||||||
qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s",
|
|
||||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||||
|
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||||
|
|
||||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||||
|
@ -1166,112 +1200,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
#if 0
|
|
||||||
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
|
|
||||||
// stream task get ready for scan history data
|
|
||||||
while (streamTaskGetStatus(pStreamTask, NULL) == TASK_STATUS__SCAN_HISTORY) {
|
|
||||||
tqDebug(
|
|
||||||
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
|
|
||||||
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
// now we can stop the stream task execution
|
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||||
int64_t nextProcessedVer = 0;
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
doStartStep2(pTask, pStreamTask, pTq);
|
||||||
while (1) {
|
|
||||||
taosThreadMutexLock(&pStreamTask->lock);
|
|
||||||
int8_t status = pStreamTask->status.taskStatus;
|
|
||||||
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
|
|
||||||
// return; do nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status == TASK_STATUS__HALT) {
|
|
||||||
// tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
|
||||||
// pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
|
||||||
// latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
|
||||||
//
|
|
||||||
// taosThreadMutexUnlock(&pStreamTask->lock);
|
|
||||||
// break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pStreamTask->status.taskStatus == TASK_STATUS__CK) {
|
|
||||||
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt",
|
|
||||||
pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__CK));
|
|
||||||
taosThreadMutexUnlock(&pStreamTask->lock);
|
|
||||||
taosMsleep(1000);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// upgrade to halt status
|
|
||||||
if (status == TASK_STATUS__PAUSE) {
|
|
||||||
qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
|
|
||||||
streamGetTaskStatusStr(TASK_STATUS__PAUSE));
|
|
||||||
} else {
|
|
||||||
qDebug("s-task:%s halt task, prev status:%s", pStreamTask->id.idStr, streamGetTaskStatusStr(status));
|
|
||||||
}
|
|
||||||
|
|
||||||
pStreamTask->status.keepTaskStatus = status;
|
|
||||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
|
||||||
|
|
||||||
// wal scan not start yet, reset it to be the start position
|
|
||||||
nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
|
||||||
if (nextProcessedVer == -1) {
|
|
||||||
nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s",
|
|
||||||
pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus,
|
|
||||||
id);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pStreamTask->lock);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
|
||||||
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
|
||||||
pRange = &pTask->dataRange.range;
|
|
||||||
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
|
||||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
|
||||||
|
|
||||||
if (done) {
|
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
|
||||||
streamTaskPutTranstateIntoInputQ(pTask);
|
|
||||||
// streamTaskRestoreStatus(pTask);
|
|
||||||
|
|
||||||
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
|
||||||
// pTask->status.keepTaskStatus = TASK_STATUS__READY;
|
|
||||||
// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
|
|
||||||
// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
|
|
||||||
// }
|
|
||||||
|
|
||||||
streamExecTask(pTask); // exec directly
|
|
||||||
} else {
|
} else {
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
|
||||||
", do secondary scan-history from WAL after halt the related stream task:%s",
|
|
||||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
|
||||||
pStreamTask->id.idStr);
|
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
|
||||||
|
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
|
||||||
|
|
||||||
int64_t dstVer = pTask->dataRange.range.minVer;
|
|
||||||
pTask->chkInfo.nextProcessVer = dstVer;
|
|
||||||
|
|
||||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
|
||||||
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
|
||||||
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
|
||||||
|
|
||||||
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
|
|
||||||
|
|
||||||
// now the fill-history task starts to scan data from wal files.
|
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
|
||||||
tqScanWalAsync(pTq, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -1467,14 +1403,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
// commit the update
|
// commit the update
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1785,7 +1721,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
int32_t total = 0;
|
int32_t total = 0;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
// set the initial value for generating check point
|
// set the initial value for generating check point
|
||||||
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
|
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
|
||||||
|
@ -1794,7 +1730,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
}
|
}
|
||||||
|
|
||||||
total = pMeta->numOfStreamTasks;
|
total = pMeta->numOfStreamTasks;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
|
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
|
||||||
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
|
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
|
||||||
|
@ -1865,7 +1801,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
// update the nodeEpset when it exists
|
// update the nodeEpset when it exists
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
|
@ -1874,7 +1810,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
req.taskId);
|
req.taskId);
|
||||||
rsp.code = TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return rsp.code;
|
return rsp.code;
|
||||||
}
|
}
|
||||||
|
@ -1883,7 +1820,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
if (pMeta->updateInfo.transId != req.transId) {
|
if (pMeta->updateInfo.transId != req.transId) {
|
||||||
pMeta->updateInfo.transId = req.transId;
|
pMeta->updateInfo.transId = req.transId;
|
||||||
tqDebug("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
|
tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
|
||||||
// info needs to be kept till the new trans to update the nodeEp arrived.
|
// info needs to be kept till the new trans to update the nodeEp arrived.
|
||||||
taosHashClear(pMeta->updateInfo.pTasks);
|
taosHashClear(pMeta->updateInfo.pTasks);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1896,19 +1833,19 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
||||||
req.transId);
|
req.transId);
|
||||||
rsp.code = TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return rsp.code;
|
return rsp.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
|
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
|
||||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
streamTaskResetStatus(pTask);
|
streamTaskResetStatus(pTask);
|
||||||
|
|
||||||
// continue after lock the meta again
|
// continue after lock the meta again
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
SStreamTask** ppHTask = NULL;
|
SStreamTask** ppHTask = NULL;
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
@ -1958,47 +1895,49 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (updateTasks < numOfTasks) {
|
if (updateTasks < numOfTasks) {
|
||||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||||
updateTasks, (numOfTasks - updateTasks));
|
updateTasks, (numOfTasks - updateTasks));
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||||
pMeta->startInfo.startAllTasksFlag = 0;
|
pMeta->startInfo.startAllTasksFlag = 0;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
while (streamMetaTaskInTimer(pMeta)) {
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
int32_t code = streamMetaReopen(pMeta);
|
int32_t code = streamMetaReopen(pMeta);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||||
tqError("vgId:%d failed to load stream tasks", vgId);
|
tqError("vgId:%d failed to load stream tasks", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||||
tqResetStreamTaskStatus(pTq);
|
tqResetStreamTaskStatus(pTq);
|
||||||
tqLaunchStreamTaskAsync(pTq);
|
tqLaunchStreamTaskAsync(pTq);
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d, follower node not start stream tasks", vgId);
|
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,9 +35,9 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
|
||||||
tqProcessSubmitReqForSubscribe(pTq);
|
tqProcessSubmitReqForSubscribe(pTq);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRLockLatch(&pTq->pStreamMeta->lock);
|
streamMetaRLock(pTq->pStreamMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||||
taosRUnLockLatch(&pTq->pStreamMeta->lock);
|
streamMetaRUnLock(pTq->pStreamMeta);
|
||||||
|
|
||||||
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
|
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
|
||||||
|
|
||||||
|
|
|
@ -1111,7 +1111,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
// update the table list handle for each stream scanner/wal reader
|
// update the table list handle for each stream scanner/wal reader
|
||||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
streamMetaWLock(pTq->pStreamMeta);
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
|
pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
|
@ -1128,6 +1128,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
streamMetaWUnLock(pTq->pStreamMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,18 +38,16 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
|
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
|
||||||
|
|
||||||
if (shouldIdle) {
|
if (shouldIdle) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
int32_t times = (--pMeta->walScanCounter);
|
int32_t times = (--pMeta->walScanCounter);
|
||||||
ASSERT(pMeta->walScanCounter >= 0);
|
ASSERT(pMeta->walScanCounter >= 0);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
if (pMeta->walScanCounter <= 0) {
|
if (times <= 0) {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
break;
|
break;
|
||||||
|
} else {
|
||||||
|
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMsleep(SCAN_WAL_IDLE_DURATION);
|
taosMsleep(SCAN_WAL_IDLE_DURATION);
|
||||||
|
@ -61,6 +59,7 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStartStreamTask(STQ* pTq) {
|
int32_t tqStartStreamTask(STQ* pTq) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
|
@ -71,11 +70,11 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||||
pMeta->startInfo.startTs = taosGetTimestampMs();
|
pMeta->startInfo.startTs = taosGetTimestampMs();
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// broadcast the check downstream tasks msg
|
// broadcast the check downstream tasks msg
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
@ -104,12 +103,16 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
code = ret;
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pTaskList);
|
taosArrayDestroy(pTaskList);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
|
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
|
||||||
|
@ -148,12 +151,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +167,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
|
|
||||||
if (pMeta->walScanCounter > 1) {
|
if (pMeta->walScanCounter > 1) {
|
||||||
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
|
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,7 +177,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
|
|
||||||
// reset the counter value, since we do not launch the scan wal operation.
|
// reset the counter value, since we do not launch the scan wal operation.
|
||||||
pMeta->walScanCounter = 0;
|
pMeta->walScanCounter = 0;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +185,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
if (pRunReq == NULL) {
|
if (pRunReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,7 +196,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -209,9 +212,9 @@ int32_t tqStopStreamTasks(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
@ -412,9 +415,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
|
|
||||||
// clone the task list, to avoid the task update during scan wal files
|
// clone the task list, to avoid the task update during scan wal files
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
taosWLockLatch(&pStreamMeta->lock);
|
streamMetaWLock(pStreamMeta);
|
||||||
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
||||||
taosWUnLockLatch(&pStreamMeta->lock);
|
streamMetaWUnLock(pStreamMeta);
|
||||||
|
|
||||||
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
||||||
|
|
||||||
|
@ -446,6 +449,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
tqDebug("s-task:%s lock", pTask->id.idStr);
|
||||||
|
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
||||||
|
|
|
@ -554,10 +554,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
walApplyVer(pVnode->pWal, commitIdx);
|
walApplyVer(pVnode->pWal, commitIdx);
|
||||||
pVnode->restored = true;
|
pVnode->restored = true;
|
||||||
|
|
||||||
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
||||||
if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) {
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
if (pMeta->startInfo.startAllTasksFlag) {
|
||||||
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||||
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -574,7 +576,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
|
|
|
@ -184,13 +184,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
|
|
||||||
{ // todo: remove this when the pipeline checkpoint generating is used.
|
{ // todo: remove this when the pipeline checkpoint generating is used.
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
if (pMeta->chkptNotReadyTasks == 0) {
|
if (pMeta->chkptNotReadyTasks == 0) {
|
||||||
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
//todo fix race condition: set the status and append checkpoint block
|
//todo fix race condition: set the status and append checkpoint block
|
||||||
|
@ -281,8 +281,9 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
|
||||||
|
|
||||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
@ -304,10 +305,10 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
char* str = NULL;
|
char* str = NULL;
|
||||||
streamTaskGetStatus(p, &str);
|
streamTaskGetStatus(p, &str);
|
||||||
|
|
||||||
int32_t code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return -1;
|
return -1;
|
||||||
} else { // save the task
|
} else { // save the task
|
||||||
streamMetaSaveTask(pMeta, p);
|
streamMetaSaveTask(pMeta, p);
|
||||||
|
@ -320,17 +321,16 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
str);
|
str);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
code = streamMetaCommit(pMeta);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
if (code < 0) {
|
||||||
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
||||||
checkpointId, terrstr());
|
checkpointId, terrstr());
|
||||||
return -1;
|
|
||||||
} else {
|
} else {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
|
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
streamMetaWUnLock(pMeta);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
|
|
|
@ -41,9 +41,9 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
|
||||||
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
|
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
|
||||||
|
|
||||||
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
||||||
pMsg->msgType = msgType;
|
pMsg->msgType = msgType;
|
||||||
pMsg->pCont = pCont;
|
pMsg->pCont = pCont;
|
||||||
pMsg->contLen = contLen;
|
pMsg->contLen = contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||||
|
@ -429,6 +429,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
{
|
{
|
||||||
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
|
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
|
||||||
taosArrayClear(pTask->msgInfo.pRetryList);
|
taosArrayClear(pTask->msgInfo.pRetryList);
|
||||||
|
@ -440,7 +441,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||||
|
|
||||||
int32_t numOfFailed = taosArrayGetSize(pList);
|
int32_t numOfFailed = taosArrayGetSize(pList);
|
||||||
stDebug("s-task:%s (child taskId:%d) re-try shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
||||||
id, pTask->info.selfChildId, numOfFailed, msgId);
|
id, pTask->info.selfChildId, numOfFailed, msgId);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfFailed; i++) {
|
for (int32_t i = 0; i < numOfFailed; i++) {
|
||||||
|
@ -471,6 +472,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
|
|
||||||
code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet);
|
code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pList);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1004,6 +1007,8 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
|
||||||
info.msg.info = *pRpcInfo;
|
info.msg.info = *pRpcInfo;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
stDebug("s-task:%s lock", pTask->id.idStr);
|
||||||
|
|
||||||
if (pTask->pRspMsgList == NULL) {
|
if (pTask->pRspMsgList == NULL) {
|
||||||
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
|
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,33 +285,31 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
|
|
||||||
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
stError(
|
stError(
|
||||||
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
|
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
|
||||||
"fill-history task",
|
"fill-history task",
|
||||||
pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId);
|
id, (int32_t) pTask->streamTaskId.taskId);
|
||||||
|
|
||||||
// 1. free it and remove fill-history task from disk meta-store
|
// 1. free it and remove fill-history task from disk meta-store
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||||
|
|
||||||
// 2. save to disk
|
// 2. save to disk
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
|
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id,
|
||||||
pStreamTask->id.idStr);
|
pStreamTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL);
|
ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL);
|
||||||
ASSERT(((status == TASK_STATUS__DROPPING) || (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) &&
|
|
||||||
pTask->status.appendTranstateBlock == true);
|
|
||||||
|
|
||||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||||
|
|
||||||
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
|
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
|
||||||
|
@ -321,7 +319,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
|
ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
|
||||||
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||||
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the stream task to handle all in the inputQ, and to be idle
|
// wait for the stream task to handle all in the inputQ, and to be idle
|
||||||
|
@ -331,7 +329,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
|
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
|
||||||
// start the task state transfer procedure.
|
// start the task state transfer procedure.
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pStreamTask, &p);
|
status = streamTaskGetStatus(pStreamTask, &p);
|
||||||
|
if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
|
||||||
|
stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
|
||||||
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
|
}
|
||||||
|
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
// update the scan data range for source task.
|
// update the scan data range for source task.
|
||||||
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
||||||
|
@ -350,30 +354,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
streamTaskReleaseState(pTask);
|
streamTaskReleaseState(pTask);
|
||||||
streamTaskReloadState(pStreamTask);
|
streamTaskReloadState(pStreamTask);
|
||||||
|
|
||||||
// 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
|
// 3. resume the state of stream task, after this function, the stream task will run immediately.
|
||||||
// pause, since the pause allowed attribute is not set yet.
|
streamTaskResume(pStreamTask);
|
||||||
streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume.
|
|
||||||
|
|
||||||
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id);
|
||||||
|
|
||||||
// 4. free it and remove fill-history task from disk meta-store
|
// 4. free it and remove fill-history task from disk meta-store
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||||
|
|
||||||
// 5. clear the link between fill-history task and stream task info
|
// 5. save to disk
|
||||||
// CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
|
|
||||||
|
|
||||||
// 6. save to disk
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
|
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
|
||||||
// streamMetaSaveTask(pMeta, pStreamTask);
|
|
||||||
// if (streamMetaCommit(pMeta) < 0) {
|
|
||||||
// persist to disk
|
|
||||||
// }
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
// 7. pause allowed.
|
// 6. pause allowed.
|
||||||
streamTaskEnablePause(pStreamTask);
|
|
||||||
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
|
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
|
||||||
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||||
|
|
||||||
|
|
|
@ -260,13 +260,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) {
|
||||||
while (pMeta->streamBackend == NULL) {
|
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
|
||||||
if (pMeta->streamBackend == NULL) {
|
|
||||||
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||||
|
@ -447,20 +443,20 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask != NULL) {
|
if (ppTask != NULL) {
|
||||||
if (!streamTaskShouldStop(*ppTask)) {
|
if (!streamTaskShouldStop(*ppTask)) {
|
||||||
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
|
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
|
||||||
return *ppTask;
|
return *ppTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,7 +487,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
|
|
||||||
// pre-delete operation
|
// pre-delete operation
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
@ -508,34 +504,35 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
|
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
if ((*ppTask)->status.timerActive == 0) {
|
if ((*ppTask)->status.timerActive == 0) {
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
|
stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// let's do delete of stream task
|
// let's do delete of stream task
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
pTask = *ppTask;
|
pTask = *ppTask;
|
||||||
|
@ -565,19 +562,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
|
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return -1;
|
return code;
|
||||||
}
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo add error log
|
// todo add error log
|
||||||
|
@ -846,6 +840,12 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) {
|
||||||
|
taosArrayDestroy(pMsg->pTaskStatus);
|
||||||
|
taosArrayDestroy(pMsg->pUpdateNodes);
|
||||||
|
taosArrayDestroy(pIdList);
|
||||||
|
}
|
||||||
|
|
||||||
void metaHbToMnode(void* param, void* tmrId) {
|
void metaHbToMnode(void* param, void* tmrId) {
|
||||||
int64_t rid = *(int64_t*)param;
|
int64_t rid = *(int64_t*)param;
|
||||||
|
|
||||||
|
@ -884,19 +884,33 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
|
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
|
||||||
|
|
||||||
SStreamHbMsg hbMsg = {0};
|
SStreamHbMsg hbMsg = {0};
|
||||||
taosRLockLatch(&pMeta->lock);
|
SEpSet epset = {0};
|
||||||
|
bool hasMnodeEpset = false;
|
||||||
|
int32_t stage = 0;
|
||||||
|
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
|
||||||
SEpSet epset = {0};
|
|
||||||
bool hasMnodeEpset = false;
|
|
||||||
|
|
||||||
hbMsg.vgId = pMeta->vgId;
|
hbMsg.vgId = pMeta->vgId;
|
||||||
|
stage = pMeta->stage;
|
||||||
|
|
||||||
|
SArray* pIdList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
|
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
||||||
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
|
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pIdList, i);
|
||||||
|
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
|
if (pTask == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// not report the status of fill-history task
|
// not report the status of fill-history task
|
||||||
if ((*pTask)->info.fillHistory == 1) {
|
if ((*pTask)->info.fillHistory == 1) {
|
||||||
|
@ -906,12 +920,12 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
STaskStatusEntry entry = {
|
STaskStatusEntry entry = {
|
||||||
.id = *pId,
|
.id = *pId,
|
||||||
.status = streamTaskGetStatus(*pTask, NULL),
|
.status = streamTaskGetStatus(*pTask, NULL),
|
||||||
.nodeId = pMeta->vgId,
|
.nodeId = hbMsg.vgId,
|
||||||
.stage = pMeta->stage,
|
.stage = stage,
|
||||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
||||||
};
|
};
|
||||||
|
|
||||||
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
|
entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
|
||||||
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
|
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
|
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
|
||||||
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
|
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
|
||||||
|
@ -930,9 +944,9 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
taosThreadMutexLock(&(*pTask)->lock);
|
taosThreadMutexLock(&(*pTask)->lock);
|
||||||
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
|
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
|
||||||
for (int j = 0; j < num; ++j) {
|
for (int j = 0; j < num; ++j) {
|
||||||
int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
|
int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
|
||||||
|
|
||||||
bool exist = false;
|
bool exist = false;
|
||||||
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
|
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
|
||||||
for (int k = 0; k < numOfExisted; ++k) {
|
for (int k = 0; k < numOfExisted; ++k) {
|
||||||
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
|
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
|
||||||
|
@ -957,7 +971,6 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
|
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
if (hasMnodeEpset) {
|
if (hasMnodeEpset) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -966,17 +979,13 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
|
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
goto _end;
|
||||||
taosReleaseRef(streamMetaId, rid);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
void* buf = rpcMallocCont(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
goto _end;
|
||||||
taosReleaseRef(streamMetaId, rid);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
|
@ -984,15 +993,12 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
|
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
|
||||||
rpcFreeCont(buf);
|
rpcFreeCont(buf);
|
||||||
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
goto _end;
|
||||||
taosReleaseRef(streamMetaId, rid);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
SRpcMsg msg = {0};
|
SRpcMsg msg = {.info.noResp = 1,};
|
||||||
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||||
msg.info.noResp = 1;
|
|
||||||
|
|
||||||
pMeta->pHbInfo->hbCount += 1;
|
pMeta->pHbInfo->hbCount += 1;
|
||||||
|
|
||||||
|
@ -1003,16 +1009,15 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
|
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
_end:
|
||||||
taosArrayDestroy(hbMsg.pUpdateNodes);
|
clearHbMsg(&hbMsg, pIdList);
|
||||||
|
|
||||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
|
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
||||||
bool inTimer = false;
|
bool inTimer = false;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1027,7 +1032,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return inTimer;
|
return inTimer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1037,7 +1042,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
|
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
|
||||||
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1051,7 +1056,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
streamTaskStop(pTask);
|
streamTaskStop(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
|
@ -1090,4 +1095,23 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||||
pStartInfo->startAllTasksFlag = 0;
|
pStartInfo->startAllTasksFlag = 0;
|
||||||
pStartInfo->readyTs = 0;
|
pStartInfo->readyTs = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||||
|
stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
||||||
|
taosRLockLatch(&pMeta->lock);
|
||||||
|
}
|
||||||
|
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||||
|
stTrace("vgId:%d meta-runlock", pMeta->vgId);
|
||||||
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
}
|
||||||
|
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||||
|
stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
}
|
||||||
|
void streamMetaWUnLock(SStreamMeta* pMeta) {
|
||||||
|
stTrace("vgId:%d meta-wunlock", pMeta->vgId);
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -580,13 +580,13 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
// execute in the scan history complete call back msg, ready to process data from inputQ
|
// execute in the scan history complete call back msg, ready to process data from inputQ
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
streamTaskSetSchedStatusInactive(pTask);
|
streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
streamMetaCommit(pMeta);
|
streamMetaCommit(pMeta);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// history data scan in the stream time window finished, now let's enable the pause
|
// history data scan in the stream time window finished, now let's enable the pause
|
||||||
streamTaskEnablePause(pTask);
|
streamTaskEnablePause(pTask);
|
||||||
|
@ -624,7 +624,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
SLaunchHTaskInfo* pInfo = param;
|
SLaunchHTaskInfo* pInfo = param;
|
||||||
SStreamMeta* pMeta = pInfo->pMeta;
|
SStreamMeta* pMeta = pInfo->pMeta;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
ASSERT((*ppTask)->status.timerActive >= 1);
|
ASSERT((*ppTask)->status.timerActive >= 1);
|
||||||
|
@ -637,11 +638,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
|
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
|
||||||
|
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
|
@ -934,66 +935,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
#if 0
|
|
||||||
int8_t status = pTask->status.taskStatus;
|
|
||||||
if (status == TASK_STATUS__DROPPING) {
|
|
||||||
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const char* str = streamGetTaskStatusStr(status);
|
|
||||||
if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
|
|
||||||
stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
|
||||||
stInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
|
|
||||||
status = pTask->status.taskStatus;
|
|
||||||
if (status == TASK_STATUS__DROPPING) {
|
|
||||||
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
|
|
||||||
stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
//
|
|
||||||
// if (pTask->status.downstreamReady == 0) {
|
|
||||||
// ASSERT(pTask->execInfo.start == 0);
|
|
||||||
// stDebug("s-task:%s in check downstream procedure, abort and paused", pTask->id.idStr);
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
|
|
||||||
const char* pStatus = streamGetTaskStatusStr(status);
|
|
||||||
stDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo: use the task lock, stead of meta lock
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
status = pTask->status.taskStatus;
|
|
||||||
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
stDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
|
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
|
||||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
||||||
|
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
|
@ -1029,19 +970,6 @@ void streamTaskResume(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo fix race condition
|
|
||||||
void streamTaskDisablePause(SStreamTask* pTask) {
|
|
||||||
// pre-condition check
|
|
||||||
// const char* id = pTask->id.idStr;
|
|
||||||
// while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
|
||||||
// stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
|
|
||||||
// taosMsleep(100);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// stDebug("s-task:%s disable task pause", id);
|
|
||||||
// pTask->status.pauseAllowed = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamTaskEnablePause(SStreamTask* pTask) {
|
void streamTaskEnablePause(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s enable task pause", pTask->id.idStr);
|
stDebug("s-task:%s enable task pause", pTask->id.idStr);
|
||||||
pTask->status.pauseAllowed = 1;
|
pTask->status.pauseAllowed = 1;
|
||||||
|
@ -1050,7 +978,7 @@ void streamTaskEnablePause(SStreamTask* pTask) {
|
||||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
STaskId id = streamTaskExtractKey(pTask);
|
STaskId id = streamTaskExtractKey(pTask);
|
||||||
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
|
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
|
||||||
|
@ -1071,6 +999,6 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||||
pStartInfo->elapsedTime / 1000.0);
|
pStartInfo->elapsedTime / 1000.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,7 +121,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
SStreamMeta* pMeta = pStreamTask->pMeta;
|
SStreamMeta* pMeta = pStreamTask->pMeta;
|
||||||
pState->streamBackendRid = pMeta->streamBackendRid;
|
pState->streamBackendRid = pMeta->streamBackendRid;
|
||||||
// taosWLockLatch(&pMeta->lock);
|
// streamMetaWLock(pMeta);
|
||||||
taosThreadMutexLock(&pMeta->backendMutex);
|
taosThreadMutexLock(&pMeta->backendMutex);
|
||||||
void* uniqueId =
|
void* uniqueId =
|
||||||
taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
#define GET_EVT_NAME(_ev) (StreamTaskEventList[(_ev)].name)
|
||||||
|
|
||||||
SStreamTaskState StreamTaskStatusList[9] = {
|
SStreamTaskState StreamTaskStatusList[9] = {
|
||||||
{.state = TASK_STATUS__READY, .name = "ready"},
|
{.state = TASK_STATUS__READY, .name = "ready"},
|
||||||
{.state = TASK_STATUS__DROPPING, .name = "dropped"},
|
{.state = TASK_STATUS__DROPPING, .name = "dropped"},
|
||||||
|
@ -66,7 +68,7 @@ static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
|
|
||||||
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
||||||
StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name);
|
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
|
||||||
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
|
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -80,13 +82,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
|
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -105,6 +100,39 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
|
||||||
|
static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) {
|
||||||
|
if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) {
|
||||||
|
return (state != TASK_STATUS__UNINIT);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event == TASK_EVENT_SCANHIST_DONE) {
|
||||||
|
return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event == TASK_EVENT_GEN_CHECKPOINT) {
|
||||||
|
return (state != TASK_STATUS__READY);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event == TASK_EVENT_CHECKPOINT_DONE) {
|
||||||
|
return (state != TASK_STATUS__CK);
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo refactor later
|
||||||
|
if (event == TASK_EVENT_RESUME) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event == TASK_EVENT_HALT) {
|
||||||
|
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT || state == TASK_STATUS__STOP ||
|
||||||
|
state == TASK_STATUS__SCAN_HISTORY) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// todo optimize the perf of find the trans objs by using hash table
|
// todo optimize the perf of find the trans objs by using hash table
|
||||||
static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) {
|
static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) {
|
||||||
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
|
int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
|
||||||
|
@ -115,10 +143,8 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) {
|
if (isInvalidStateTransfer(state, event)) {
|
||||||
|
return NULL;
|
||||||
} else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) {
|
|
||||||
// the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block.
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -128,9 +154,10 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
|
||||||
|
|
||||||
void streamTaskRestoreStatus(SStreamTask* pTask) {
|
void streamTaskRestoreStatus(SStreamTask* pTask) {
|
||||||
SStreamTaskSM* pSM = pTask->status.pSM;
|
SStreamTaskSM* pSM = pTask->status.pSM;
|
||||||
taosThreadMutexLock(&pTask->lock);
|
|
||||||
ASSERT(pSM->pActiveTrans == NULL);
|
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
ASSERT(pSM->pActiveTrans == NULL);
|
||||||
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
|
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
|
||||||
|
|
||||||
SStreamTaskState state = pSM->current;
|
SStreamTaskState state = pSM->current;
|
||||||
|
@ -199,14 +226,14 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
|
if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {
|
||||||
stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name);
|
stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already
|
} else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already
|
||||||
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name);
|
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event));
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
|
stDebug("s-task:%s is dropped or stopped already, not wait.", id);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,37 +254,41 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SStreamTask* pTask = pSM->pTask;
|
SStreamTask* pTask = pSM->pTask;
|
||||||
STaskStateTrans* pTrans = NULL;
|
STaskStateTrans* pTrans = NULL;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
|
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
|
||||||
|
EStreamTaskEvent evt = pSM->pActiveTrans->event;
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
taosMsleep(100);
|
|
||||||
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
||||||
pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name);
|
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
|
||||||
|
taosMsleep(100);
|
||||||
} else {
|
} else {
|
||||||
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name);
|
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event.
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSM->pActiveTrans != NULL) {
|
if (pSM->pActiveTrans != NULL) {
|
||||||
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
||||||
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
||||||
pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name,
|
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
||||||
pSM->pActiveTrans->next.name, StreamTaskEventList[event].name);
|
pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
|
||||||
}
|
}
|
||||||
|
|
||||||
doHandleEvent(pSM, event, pTrans);
|
code = doHandleEvent(pSM, event, pTrans);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void keepPrevInfo(SStreamTaskSM* pSM) {
|
static void keepPrevInfo(SStreamTaskSM* pSM) {
|
||||||
|
@ -272,24 +303,27 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
|
|
||||||
// do update the task status
|
// do update the task status
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
|
||||||
|
|
||||||
|
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
ETaskStatus s = pSM->current.state;
|
ETaskStatus s = pSM->current.state;
|
||||||
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP);
|
ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP ||
|
||||||
|
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
|
||||||
|
|
||||||
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
||||||
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
|
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
|
||||||
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name);
|
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
stDebug("s-task:%s unlockx", pTask->id.idStr);
|
||||||
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->event != event) {
|
if (pTrans->event != event) {
|
||||||
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
||||||
StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name);
|
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
|
||||||
keepPrevInfo(pSM);
|
keepPrevInfo(pSM);
|
||||||
|
@ -303,14 +337,17 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
||||||
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
|
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
||||||
|
|
||||||
SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->pWaitingEventList);
|
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
||||||
|
|
||||||
// OK, let's handle the attached event, since the task has reached the required status now
|
// OK, let's handle the attached event, since the task has reached the required status now
|
||||||
if (pSM->current.state == pEvtInfo->status) {
|
if (pSM->current.state == pEvtInfo->status) {
|
||||||
stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr,
|
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
||||||
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
|
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
|
||||||
|
|
||||||
|
// remove it
|
||||||
|
taosArrayPop(pSM->pWaitingEventList);
|
||||||
|
|
||||||
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
|
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
|
||||||
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
|
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
|
||||||
|
@ -325,13 +362,18 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
} else {
|
} else {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
|
||||||
|
pSM->current.name, GET_EVT_NAME(pEvtInfo->event),
|
||||||
|
StreamTaskStatusList[pEvtInfo->status].name);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
StreamTaskEventList[pTrans->event].name, el, pSM->prev.state.name, pSM->current.name);
|
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -440,6 +482,10 @@ void doInitStateTransferTable(void) {
|
||||||
streamTaskKeepCurrentVerInWal, NULL, true);
|
streamTaskKeepCurrentVerInWal, NULL, true);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
||||||
|
streamTaskKeepCurrentVerInWal, NULL, true);
|
||||||
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
||||||
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
||||||
streamTaskKeepCurrentVerInWal, &info, true);
|
streamTaskKeepCurrentVerInWal, &info, true);
|
||||||
|
|
|
@ -680,6 +680,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled
|
||||||
// stream
|
// stream
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS,"Invalid task status to proceed")
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
|
||||||
|
|
Loading…
Reference in New Issue