From a4ba5401f96b505c6edf6cb4f70c7af57a2bb76d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Apr 2023 10:01:36 +0800 Subject: [PATCH] enh(stream): set the start version of all operators. --- include/libs/executor/executor.h | 3 + include/libs/stream/tstream.h | 33 ++++----- source/dnode/vnode/src/inc/tq.h | 4 + source/dnode/vnode/src/tq/tq.c | 57 +++----------- source/dnode/vnode/src/tq/tqCommit.c | 3 + source/dnode/vnode/src/tq/tqRestore.c | 17 ++--- source/dnode/vnode/src/tq/tqUtil.c | 74 ++++++++++++++++++- source/libs/executor/inc/executorimpl.h | 16 ++-- source/libs/executor/src/executor.c | 6 ++ source/libs/executor/src/timewindowoperator.c | 15 ++++ source/libs/stream/src/streamExec.c | 22 +++++- source/libs/stream/src/streamMeta.c | 46 +++++++++--- source/util/src/tworker.c | 2 +- 13 files changed, 200 insertions(+), 98 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index ee8ee1050d..fd66194143 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -91,6 +91,9 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); +// todo refactor +int64_t qGetCheckpointVersion(qTaskInfo_t tinfo); + /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a338413502..22e9356b3d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -295,14 +295,11 @@ struct SStreamTask { int16_t dispatchMsgType; int8_t taskStatus; int8_t schedStatus; - - // node info - int32_t selfChildId; - int32_t nodeId; - SEpSet epSet; - - int64_t recoverSnapVer; - int64_t startVer; + int32_t selfChildId; + int32_t nodeId; + SEpSet epSet; + int64_t recoverSnapVer; + int64_t startVer; // fill history int8_t fillHistory; @@ -340,15 +337,15 @@ struct SStreamTask { // state backend SStreamState* pState; - // do not serialize - int32_t recoverTryingDownstream; - int32_t recoverWaitingUpstream; - int64_t checkReqId; - SArray* checkReqIds; // shuffle - int32_t refCnt; - - int64_t checkpointingId; - int32_t checkpointAlignCnt; + // the followings attributes don't be serialized + int32_t recoverTryingDownstream; + int32_t recoverWaitingUpstream; + int64_t checkReqId; + SArray* checkReqIds; // shuffle + int32_t refCnt; + int64_t checkpointingId; + int32_t checkpointAlignCnt; + struct SStreamMeta* pMeta; }; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -597,6 +594,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId); + int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index d4af9ac481..50a09229fa 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -183,6 +183,10 @@ void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset); +void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver); +void saveOffsetForAllTasks(STQ* pTq, int64_t ver); +void initOffsetForAllRestoreTasks(STQ* pTq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aa8960e977..91e2569a8c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -907,6 +907,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->startVer = ver; + pTask->pMeta = pTq->pStreamMeta; // expand executor if (pTask->fillHistory) { @@ -979,7 +980,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, child id %d, level %d", vgId, pTask->id.idStr, pTask->selfChildId, pTask->taskLevel); + tqInfo("vgId:%d expand stream task, s-task:%s, ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, + pTask->startVer, pTask->selfChildId, pTask->taskLevel); return 0; } @@ -1370,16 +1372,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } -static void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) { - STqOffset offset = {0}; - tqOffsetResetToLog(&offset.val, ver); - - tstrncpy(offset.subKey, pKey, tListLen(offset.subKey)); - - // keep the offset info in the offset store - tqOffsetWrite(pOffsetStore, &offset); -} - static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit, const char* key, int64_t ver) { doSaveTaskOffset(pOffsetStore, key, ver); @@ -1392,36 +1384,6 @@ static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTa return TSDB_CODE_SUCCESS; } -static void saveOffsetForAllTasks(STQ* pTq, SPackedData submit) { - void* pIter = NULL; - - while(1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, - pTask->taskStatus); - continue; - } - - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver); - } - } -} - int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { void* pIter = NULL; @@ -1429,7 +1391,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { if (pSubmit == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to create data submit for stream since out of memory"); - saveOffsetForAllTasks(pTq, submit); + saveOffsetForAllTasks(pTq, submit.ver); return -1; } @@ -1518,11 +1480,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { tqDoRestoreSourceStreamTasks(pTq); return 0; } else { - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->taskStatus == TASK_STATUS__NORMAL) { tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); streamProcessRunReq(pTask); + } else if (pTask->taskStatus == TASK_STATUS_RESTORE) { + tqDebug("vgId:%d s-task:%s start to restore from last ck", vgId, pTask->id.idStr); + streamProcessRunReq(pTask); } else { tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); } @@ -1683,8 +1648,10 @@ int32_t tqRestoreStreamTasks(STQ* pTq) { return -1; } - tqInfo("vgId:%d start to restore all stream tasks", vgId); - + int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks); + tqInfo("vgId:%d start restoring stream tasks, total tasks:%d", vgId, numOfTasks); + initOffsetForAllRestoreTasks(pTq); + pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = ALL_STREAM_TASKS_ID; diff --git a/source/dnode/vnode/src/tq/tqCommit.c b/source/dnode/vnode/src/tq/tqCommit.c index 7fc66c4919..0f5daa31ad 100644 --- a/source/dnode/vnode/src/tq/tqCommit.c +++ b/source/dnode/vnode/src/tq/tqCommit.c @@ -16,10 +16,13 @@ #include "tq.h" int tqCommit(STQ* pTq) { +#if 0 + // stream meta commit does not be aligned to the vnode commit if (streamMetaCommit(pTq->pStreamMeta) < 0) { tqError("vgId:%d, failed to commit stream meta since %s", TD_VID(pTq->pVnode), terrstr()); return -1; } +#endif return tqOffsetCommitFile(pTq->pOffsetStore); } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 50fcea2b54..9377e3d58f 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -15,25 +15,19 @@ #include "tq.h" -static int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList); +static int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList); static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList); // this function should be executed by stream threads. // there is a case that the WAL increases more fast than the restore procedure, and this restore procedure // will not stop eventually. int tqDoRestoreSourceStreamTasks(STQ* pTq) { - - // todo set the offset value from the previous check point offset int64_t st = taosGetTimestampMs(); - int32_t vgId = TD_VID(pTq->pVnode); - int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks); - tqInfo("vgId:%d start restoring stream tasks, total tasks:%d", vgId, numOfTasks); - while (1) { SArray* pTaskList = taosArrayInit(4, POINTER_BYTES); // check all restore tasks - restoreStreamTask(pTq->pStreamMeta, pTq->pOffsetStore, pTaskList); + restoreStreamTaskImpl(pTq->pStreamMeta, pTq->pOffsetStore, pTaskList); transferToNormalTask(pTq->pStreamMeta, pTaskList); taosArrayDestroy(pTaskList); @@ -44,7 +38,7 @@ int tqDoRestoreSourceStreamTasks(STQ* pTq) { } int64_t et = taosGetTimestampMs(); - tqInfo("vgId:%d restoring task completed, elapsed time:%" PRId64 " sec.", vgId, (et - st)); + tqInfo("vgId:%d restoring task completed, elapsed time:%" PRId64 " sec.", TD_VID(pTq->pVnode), (et - st)); return 0; } @@ -68,7 +62,7 @@ int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { return TSDB_CODE_SUCCESS; } -int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList) { +int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList) { // check all restore tasks void* pIter = NULL; @@ -93,7 +87,8 @@ int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); if (tInputQueueIsFull(pTask)) { - tqDebug("s-task:%s input queue is full, do nothing" PRId64, pTask->id.idStr); + tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); + taosMsleep(10); continue; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index ac88cf1916..14054ad998 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -69,4 +69,76 @@ int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pO } return 0; -} \ No newline at end of file +} + +void initOffsetForAllRestoreTasks(STQ* pTq) { + void* pIter = NULL; + + while(1) { + pIter = taosHashIterate(pTq->pStreamMeta->pRestoreTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + continue; + } + + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, + pTask->taskStatus); + continue; + } + + char key[128] = {0}; + createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); + + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); + if (pOffset == NULL) { + doSaveTaskOffset(pTq->pOffsetStore, key, pTask->startVer); + } + } + +} + +void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { + void* pIter = NULL; + + while(1) { + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + continue; + } + + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, + pTask->taskStatus); + continue; + } + + char key[128] = {0}; + createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); + + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); + if (pOffset == NULL) { + doSaveTaskOffset(pTq->pOffsetStore, key, ver); + } + } +} + +void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) { + STqOffset offset = {0}; + tqOffsetResetToLog(&offset.val, ver); + + tstrncpy(offset.subKey, pKey, tListLen(offset.subKey)); + + // keep the offset info in the offset store + tqOffsetWrite(pOffsetStore, &offset); +} + diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 16433dc34e..759502e40f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -127,14 +127,9 @@ enum { }; typedef struct { - // TODO remove prepareStatus -// STqOffsetVal prepareStatus; // for tmq - STqOffsetVal currentOffset; // for tmq - SMqMetaRsp metaRsp; // for tmq fetching meta -// int8_t returned; - int64_t snapshotVer; - // const SSubmitReq* pReq; - + STqOffsetVal currentOffset; // for tmq + SMqMetaRsp metaRsp; // for tmq fetching meta + int64_t snapshotVer; SPackedData submit; SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; @@ -144,6 +139,7 @@ typedef struct { int64_t fillHistoryVer1; int64_t fillHistoryVer2; SStreamState* pState; + int64_t dataVersion; } SStreamTaskInfo; typedef struct { @@ -191,7 +187,6 @@ enum { OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, OP_EXEC_DONE = 0x9, -// OP_EXEC_RECV = 0x11, }; typedef struct SOperatorFpSet { @@ -560,6 +555,7 @@ typedef struct SStreamIntervalOperatorInfo { uint64_t numOfDatapack; SArray* pUpdated; SSHashObj* pUpdatedMap; + int64_t dataVersion; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { @@ -609,6 +605,7 @@ typedef struct SStreamSessionAggOperatorInfo { bool ignoreExpiredDataSaved; SArray* pUpdated; SSHashObj* pStUpdated; + int64_t dataVersion; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -627,6 +624,7 @@ typedef struct SStreamStateAggOperatorInfo { bool ignoreExpiredDataSaved; SArray* pUpdated; SSHashObj* pSeUpdated; + int64_t dataVersion; } SStreamStateAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d9868f59b9..caaeaa76c2 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -198,6 +198,12 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { return code; } +int64_t qGetCheckpointVersion(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = tinfo; + return pTaskInfo->streamInfo.dataVersion; +} + + int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 880de7d6bf..f122323109 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2333,9 +2333,14 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } +static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version) { + pTaskInfo->streamInfo.dataVersion = version; +} + static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; + pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -2501,6 +2506,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion); qDebug("===stream===clear semi operator"); } else { deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, @@ -2774,6 +2780,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; + pInfo->dataVersion = 0; pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -3124,6 +3131,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData int32_t rows = pSDataBlock->info.rows; int32_t winRows = 0; + pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); + SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); TSKEY* startTsCols = (int64_t*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = NULL; @@ -3587,6 +3596,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->ignoreExpiredDataSaved = false; pInfo->pUpdated = NULL; pInfo->pStUpdated = NULL; + pInfo->dataVersion = 0; setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -3897,6 +3907,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl TSKEY* tsCols = NULL; SResultRow* pResult = NULL; int32_t winRows = 0; + + pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); + if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; @@ -4113,6 +4126,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredDataSaved = false; pInfo->pUpdated = NULL; pInfo->pSeUpdated = NULL; + pInfo->dataVersion = 0; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4748,6 +4762,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { &pInfo->delKey); setOperatorCompleted(pOperator); streamStateCommit(pTaskInfo->streamInfo.pState); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion); return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d23590a08b..f2db8113d3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,8 +21,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; - while(pTask->taskLevel == TASK_LEVEL__SOURCE && atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { - qError("stream task wait for the end of fill history"); + while (pTask->taskLevel == TASK_LEVEL__SOURCE && atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, + atomic_load_8(&pTask->taskStatus)); taosMsleep(2); continue; } @@ -236,7 +237,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { - qDebug("stream task exec over, queue empty, task: %d", pTask->id.taskId); + qDebug("s-task:%s stream task exec over, queue empty", pTask->id.idStr); break; } @@ -284,7 +285,19 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamTaskExecImpl(pTask, pInput, pRes); - qDebug("s-task:%s exec end", pTask->id.idStr); + int64_t ckVer = qGetCheckpointVersion(pTask->exec.pExecutor); + if (ckVer > pTask->startVer) { // save it since the checkpoint is updated + qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->startVer, ckVer); + pTask->startVer = ckVer; + streamMetaSaveTask(pTask->pMeta, pTask); + + if (streamMetaCommit(pTask->pMeta) < 0) { + qError("failed to commit stream meta, since %s", terrstr()); + return -1; + } + } else { + qDebug("s-task:%s exec end", pTask->id.idStr); + } if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); @@ -333,6 +346,7 @@ int32_t streamTryExec(SStreamTask* pTask) { return -1; } + // todo the task should be commit here atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); if (!taosQueueEmpty(pTask->inputQueue->queue)) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8693915c46..a22d768a89 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -195,17 +195,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { taosRLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - SStreamTask* pTask = *ppTask; - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__DROPPING) { - atomic_add_fetch_32(&pTask->refCnt, 1); - taosRUnLockLatch(&pMeta->lock); - return pTask; - } else { - taosRUnLockLatch(&pMeta->lock); - return NULL; - } + if (ppTask != NULL && (atomic_load_8(&((*ppTask)->taskStatus)) != TASK_STATUS__DROPPING)) { + atomic_add_fetch_32(&(*ppTask)->refCnt, 1); + taosRUnLockLatch(&pMeta->lock); + return *ppTask; } + taosRUnLockLatch(&pMeta->lock); return NULL; } @@ -219,6 +214,37 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { } } +SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { + taosRLockLatch(&pMeta->lock); + + SStreamTask* pTask = NULL; + int32_t numOfRestored = taosHashGetSize(pMeta->pRestoreTasks); + if (numOfRestored > 0) { + SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pRestoreTasks, &taskId, sizeof(int32_t)); + if (p != NULL) { + pTask = *p; + if (pTask != NULL && (atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING)) { + atomic_add_fetch_32(&pTask->refCnt, 1); + taosRUnLockLatch(&pMeta->lock); + return pTask; + } + } + } else { + SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + if (p != NULL) { + pTask = *p; + if (pTask != NULL && atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING) { + atomic_add_fetch_32(&pTask->refCnt, 1); + taosRUnLockLatch(&pMeta->lock); + return pTask; + } + } + } + + taosRUnLockLatch(&pMeta->lock); + return NULL; +} + void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 631bcb443e..d57104dd78 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -218,7 +218,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem int32_t queueNum = taosGetQueueNumber(pool->qset); int32_t curWorkerNum = taosArrayGetSize(pool->workers); int32_t dstWorkerNum = ceil(queueNum * pool->ratio); - if (dstWorkerNum < 1) dstWorkerNum = 1; + if (dstWorkerNum < 2) dstWorkerNum = 2; // spawn a thread to process queue while (curWorkerNum < dstWorkerNum) {