From bbd423f666cf453d1e233bdfa869edcee5d0de8e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 May 2024 09:46:12 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/inc/streamInt.h | 1 - source/libs/stream/src/streamCheckStatus.c | 26 +++ source/libs/stream/src/streamQueue.c | 30 +++ source/libs/stream/src/streamSched.c | 14 +- source/libs/stream/src/streamSnapshot.c | 37 ++- source/libs/stream/src/streamStartHistory.c | 247 ++++++++------------ 6 files changed, 179 insertions(+), 176 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 3a4e3d81fb..ceb6cd9739 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -158,7 +158,6 @@ typedef enum ECHECKPOINT_BACKUP_TYPE { ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskDownloadCheckpointData(char* id, char* path); - int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index f0f12cae2b..e6f127349c 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -162,6 +162,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, + SRpcHandleInfo* pRpcInfo, int32_t taskId) { + SEncoder encoder; + int32_t code; + int32_t len; + + tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code); + if (code < 0) { + stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId); + return -1; + } + + void* buf = rpcMallocCont(sizeof(SMsgHead) + len); + ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeStreamTaskCheckRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; + + tmsgSendRsp(&rspMsg); + return 0; +} + int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; taosThreadMutexLock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 0ac282c362..5596eb3dee 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -330,6 +330,36 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return 0; } +int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { + SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); + if (pTranstate == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pBlock == NULL) { + taosFreeQitem(pTranstate); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pTranstate->type = STREAM_INPUT__TRANS_STATE; + + pBlock->info.type = STREAM_TRANS_STATE; + pBlock->info.rows = 1; + pBlock->info.childId = pTask->info.selfChildId; + + pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock; + taosArrayPush(pTranstate->blocks, pBlock); + + taosMemoryFree(pBlock); + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pTask->status.appendTranstateBlock = true; + return TSDB_CODE_SUCCESS; +} + // the result should be put into the outputQ in any cases, the result may be lost otherwise. int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputq.queue->pQueue; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 2e337234b6..52e7431e70 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -17,7 +17,7 @@ #include "ttimer.h" static void streamTaskResumeHelper(void* param, void* tmrId); -static void streamSchedByTimer(void* param, void* tmrId); +static void streamTaskSchedHelper(void* param, void* tmrId); int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { @@ -26,7 +26,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); + pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.triggerParam, pTask, streamTimer); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } @@ -112,7 +112,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) { streamMetaReleaseTask(pTask->pMeta, pTask); } -void streamSchedByTimer(void* param, void* tmrId) { +void streamTaskSchedHelper(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; const char* id = pTask->id.idStr; int32_t nextTrigger = (int32_t)pTask->info.triggerParam; @@ -133,7 +133,7 @@ void streamSchedByTimer(void* param, void* tmrId) { if (pTrigger == NULL) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -144,7 +144,7 @@ void streamSchedByTimer(void* param, void* tmrId) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -153,7 +153,7 @@ void streamSchedByTimer(void* param, void* tmrId) { int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -161,5 +161,5 @@ void streamSchedByTimer(void* param, void* tmrId) { } } - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); } diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index a69cb75dad..1800324cc8 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -32,6 +32,7 @@ typedef struct SBackendFileItem { int64_t size; int8_t ref; } SBackendFileItem; + typedef struct SBackendFile { char* pCurrent; char* pMainfest; @@ -102,6 +103,7 @@ struct SStreamSnapWriter { int64_t ever; SStreamSnapHandle handle; }; + const char* ROCKSDB_OPTIONS = "OPTIONS"; const char* ROCKSDB_MAINFEST = "MANIFEST"; const char* ROCKSDB_SST = "sst"; @@ -120,7 +122,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle); } while (0) int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { - int ret = 0; + int32_t ret = 0; char* fullname = taosMemoryCalloc(1, strlen(path) + 32); sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); @@ -149,7 +151,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest); if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions); if (pSnapFile->pSst) { - for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { char* name = taosArrayGetP(pSnapFile->pSst, i); sprintf(buf + strlen(buf), "%s,", name); } @@ -157,7 +159,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { sprintf(buf + strlen(buf) - 1, "]"); stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, - pSnapFile->snapInfo.taskId, buf); + pSnapFile->snapInfo.taskId, buf); taosMemoryFree(buf); } } @@ -183,7 +185,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) { streamGetFileSize(pSnapFile->path, item.name, &item.size); taosArrayPush(pSnapFile->pFileList, &item); // sst - for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { char* sst = taosArrayGetP(pSnapFile->pSst, i); item.name = sst; item.type = ROCKSDB_SST_TYPE; @@ -270,12 +272,12 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { taosMemoryFree(pSnap->pMainfest); taosMemoryFree(pSnap->pOptions); taosMemoryFree(pSnap->path); - for (int i = 0; i < taosArrayGetSize(pSnap->pSst); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) { char* sst = taosArrayGetP(pSnap->pSst, i); taosMemoryFree(sst); } // unite read/write snap file - for (int i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) { SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i); if (pItem->ref == 0) { taosMemoryFree(pItem->name); @@ -297,7 +299,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); - for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); SBackendSnapFile2 snapFile = {0}; @@ -305,7 +307,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta ASSERT(code == 0); taosArrayPush(pDbSnapSet, &snapFile); } - for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); taosMemoryFree(pSnap->dbPrefixPath); } @@ -324,7 +326,7 @@ _err: void streamSnapHandleDestroy(SStreamSnapHandle* handle) { if (handle->pDbSnapSet) { - for (int i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) { + for (int32_t i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) { SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pDbSnapSet, i); snapFileDebugInfo(pSnapFile); snapFileDestroy(pSnapFile); @@ -396,9 +398,9 @@ _NEXT: item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 - ", file no.%d, total set:%d, current set idx: %d", - STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx, - (int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); + ", file no.%d, total set:%d, current set idx: %d", + STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx, + (int32_t)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); @@ -489,14 +491,10 @@ int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) { } int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, SBackendSnapFile2* pSnapFile) { - int code = -1; + int32_t code = -1; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapHandle* pHandle = &pWriter->handle; - SStreamTaskSnap snapInfo = pHdr->snapInfo; - - SStreamTaskSnap* pSnapInfo = &pSnapFile->snapInfo; - - SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); + SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); if (pSnapFile->fd == 0) { pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); @@ -540,6 +538,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t pSnapFile->offset += pHdr->size; } code = 0; + _EXIT: return code; } @@ -590,8 +589,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa return streamSnapWrite(pWriter, pData, nData); } } - return code; } + int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { if (pWriter == NULL) return 0; streamSnapHandleDestroy(&pWriter->handle); diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index b3df5755ea..98c0d95781 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -38,8 +38,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doExecScanhistoryInFuture(void* param, void* tmrId); static int32_t doStartScanHistoryTask(SStreamTask* pTask); static int32_t streamTaskStartScanHistory(SStreamTask* pTask); +static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask); +static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask); +static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now); +static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now); -int32_t streamTaskSetReady(SStreamTask* pTask) { +static int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); SStreamTaskState* p = streamTaskGetStatus(pTask); @@ -79,33 +83,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } -void doExecScanhistoryInFuture(void* param, void* tmrId) { - SStreamTask* pTask = param; - pTask->schedHistoryInfo.numOfTicks -= 1; - - SStreamTaskState* p = streamTaskGetStatus(pTask); - if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - if (pTask->schedHistoryInfo.numOfTicks <= 0) { - streamStartScanHistoryAsync(pTask, 0); - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, - pTask->info.fillHistory, ref); - - // release the task. - streamMetaReleaseTask(pTask->pMeta, pTask); - } else { - taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); - } -} - int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { @@ -136,17 +113,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) return TSDB_CODE_SUCCESS; } -int32_t doStartScanHistoryTask(SStreamTask* pTask) { - SVersionRange* pRange = &pTask->dataRange.range; - if (pTask->info.fillHistory) { - streamSetParamForScanHistory(pTask); - } - - streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); - int32_t code = streamStartScanHistoryAsync(pTask, 0); - return code; -} - int32_t streamTaskStartScanHistory(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; ETaskStatus status = streamTaskGetStatus(pTask)->state; @@ -267,32 +233,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, - SRpcHandleInfo* pRpcInfo, int32_t taskId) { - SEncoder encoder; - int32_t code; - int32_t len; - - tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code); - if (code < 0) { - stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId); - return -1; - } - - void* buf = rpcMallocCont(sizeof(SMsgHead) + len); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeStreamTaskCheckRsp(&encoder, pRsp); - tEncoderClear(&encoder); - - SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; - - tmsgSendRsp(&rspMsg); - return 0; -} - // common int32_t streamSetParamForScanHistory(SStreamTask* pTask) { stDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr); @@ -308,6 +248,55 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); } +// an fill history task needs to be started. +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + const char* idStr = pTask->id.idStr; + int64_t hStreamId = pTask->hTaskInfo.id.streamId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; + ASSERT(hTaskId != 0); + + // check stream task status in the first place. + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) { + stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, + pStatus->name); + + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + return -1; // todo set the correct error code + } + + stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); + + // Set the execute conditions, including the query time window and the version range + streamMetaRLock(pMeta); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + streamMetaRUnLock(pMeta); + + if (pHTask != NULL) { // it is already added into stream meta store. + SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); + if (pHisTask == NULL) { + stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + } else { + if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing + stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); + } else { // exist, but not ready, continue check downstream task status + checkFillhistoryTaskStatus(pTask, pHisTask); + } + + streamMetaReleaseTask(pMeta, pHisTask); + } + + return TSDB_CODE_SUCCESS; + } else { + return launchNotBuiltFillHistoryTask(pTask); + } +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { pReq->msgHead.vgId = pTask->info.nodeId; pReq->streamId = pTask->id.streamId; @@ -316,37 +305,7 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8 return 0; } -int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { - SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); - if (pTranstate == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (pBlock == NULL) { - taosFreeQitem(pTranstate); - return TSDB_CODE_OUT_OF_MEMORY; - } - - pTranstate->type = STREAM_INPUT__TRANS_STATE; - - pBlock->info.type = STREAM_TRANS_STATE; - pBlock->info.rows = 1; - pBlock->info.childId = pTask->info.selfChildId; - - pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock; - taosArrayPush(pTranstate->blocks, pBlock); - - taosMemoryFree(pBlock); - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pTask->status.appendTranstateBlock = true; - return TSDB_CODE_SUCCESS; -} - -static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { +void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { SDataRange* pRange = &pHTask->dataRange; // the query version range should be limited to the already processed data @@ -365,7 +324,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST); } -static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { +void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; @@ -379,7 +338,7 @@ static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p pHTaskInfo->id.streamId = 0; } -static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { +void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; @@ -401,7 +360,7 @@ static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p } } -static void tryLaunchHistoryTask(void* param, void* tmrId) { +void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; int64_t now = taosGetTimestampMs(); @@ -449,7 +408,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { - noRetryLaunchFillHistoryTask(pTask, pInfo, now); + notRetryLaunchFillHistoryTask(pTask, pInfo, now); } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. streamTaskSetRetryInfoForLaunch(pHTaskInfo); ASSERT(pTask->status.timerActive >= 1); @@ -500,7 +459,7 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, in return pInfo; } -static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { +int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; STaskExecStatisInfo* pExecInfo = &pTask->execInfo; const char* idStr = pTask->id.idStr; @@ -547,54 +506,6 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -// an fill history task needs to be started. -int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; - STaskExecStatisInfo* pExecInfo = &pTask->execInfo; - const char* idStr = pTask->id.idStr; - int64_t hStreamId = pTask->hTaskInfo.id.streamId; - int32_t hTaskId = pTask->hTaskInfo.id.taskId; - ASSERT(hTaskId != 0); - - // check stream task status in the first place. - SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) { - stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, - pStatus->name); - - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); - return -1; // todo set the correct error code - } - - stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); - - // Set the execute conditions, including the query time window and the version range - streamMetaRLock(pMeta); - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - streamMetaRUnLock(pMeta); - - if (pHTask != NULL) { // it is already added into stream meta store. - SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); - if (pHisTask == NULL) { - stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); - } else { - if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing - stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); - } else { // exist, but not ready, continue check downstream task status - checkFillhistoryTaskStatus(pTask, pHisTask); - } - - streamMetaReleaseTask(pMeta, pHisTask); - } - - return TSDB_CODE_SUCCESS; - } else { - return launchNotBuiltFillHistoryTask(pTask); - } -} - int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamInfoResetTimewindowFilter(exec); @@ -651,3 +562,41 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { streamSetParamForStreamScannerStep2(pTask, &verRange, &win); } } + +void doExecScanhistoryInFuture(void* param, void* tmrId) { + SStreamTask* pTask = param; + pTask->schedHistoryInfo.numOfTicks -= 1; + + SStreamTaskState* p = streamTaskGetStatus(pTask); + if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (pTask->schedHistoryInfo.numOfTicks <= 0) { + streamStartScanHistoryAsync(pTask, 0); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, + pTask->info.fillHistory, ref); + + // release the task. + streamMetaReleaseTask(pTask->pMeta, pTask); + } else { + taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); + } +} + +int32_t doStartScanHistoryTask(SStreamTask* pTask) { + SVersionRange* pRange = &pTask->dataRange.range; + if (pTask->info.fillHistory) { + streamSetParamForScanHistory(pTask); + } + + streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); + int32_t code = streamStartScanHistoryAsync(pTask, 0); + return code; +}