refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-11-05 01:23:23 +08:00
parent de5c4d8d5d
commit e13f003b90
6 changed files with 30 additions and 20 deletions

View File

@ -1064,7 +1064,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
return code; return code;
} }
static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
@ -1105,7 +1105,7 @@ static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq)
} }
} }
// this function should be executed by only one thread // this function should be executed by only one thread, so we set an sentinel to protect this function
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -1134,6 +1134,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
// let's decide which step should be executed now
if (pTask->execInfo.step1Start == 0) { if (pTask->execInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false); ASSERT(pTask->status.pauseAllowed == false);
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
@ -1167,14 +1168,28 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
streamScanHistoryData(pTask); EScanHistoryRet ret = streamScanHistoryData(pTask);
// todo update the step1 exec elapsed time
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE) {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
if (ret == TASK_SCANHISTORY_QUIT || ret == TASK_SCANHISTORY_REXEC) {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
if (ret == TASK_SCANHISTORY_REXEC) {
streamStartScanHistoryAsync(pTask, 0);
} else {
char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s status:%p not continue scan-history data", pTask->id.idStr, p);
}
}
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
@ -1203,23 +1218,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
doStartStep2(pTask, pStreamTask, pTq); doStartFillhistoryStep2(pTask, pStreamTask, pTq);
} else { } else {
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
} }
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
// Not update the fill-history time window until the state transfer is completed if the related fill-history task // Not update the fill-history time window until the state transfer is completed.
// exists. tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64
tqDebug( ", window:%" PRId64 " - %" PRId64,
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, startVer:%" PRId64 id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
code = streamTaskScanHistoryDataComplete(pTask); code = streamTaskScanHistoryDataComplete(pTask);
} }

View File

@ -1046,7 +1046,7 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
} }
} }
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished; return pTaskInfo->streamInfo.recoverScanFinished;
} }

View File

@ -1104,7 +1104,6 @@ void streamMetaRLock(SStreamMeta* pMeta) {
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId); stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
} }
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId); stTrace("vgId:%d meta-wlock", pMeta->vgId);

View File

@ -340,10 +340,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0; return 0;
} }
// the result should be put into the outputQ in any cases, otherwise, the result may be lost // the result should be put into the outputQ in any cases, the result may be lost otherwise.
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputq.queue->pQueue; STaosQueue* pQueue = pTask->outputq.queue->pQueue;
// wait for the output queue is available for new data to dispatch
while (streamQueueIsFull(pTask->outputq.queue)) { while (streamQueueIsFull(pTask->outputq.queue)) {
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);

View File

@ -400,8 +400,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree(pTask->outputInfo.pTokenBucket); taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock); taosThreadMutexDestroy(&pTask->lock);
taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList); pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
pTask->outputInfo.pDownstreamUpdateList = NULL;
taosMemoryFree(pTask); taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId); stDebug("s-task:0x%x free task completed", taskId);

View File

@ -315,7 +315,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockx", pTask->id.idStr);
return TSDB_CODE_STREAM_INVALID_STATETRANS; return TSDB_CODE_STREAM_INVALID_STATETRANS;
} }