From bbd423f666cf453d1e233bdfa869edcee5d0de8e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 May 2024 09:46:12 +0800 Subject: [PATCH 1/3] refactor: do some internal refactor. --- source/libs/stream/inc/streamInt.h | 1 - source/libs/stream/src/streamCheckStatus.c | 26 +++ source/libs/stream/src/streamQueue.c | 30 +++ source/libs/stream/src/streamSched.c | 14 +- source/libs/stream/src/streamSnapshot.c | 37 ++- source/libs/stream/src/streamStartHistory.c | 247 ++++++++------------ 6 files changed, 179 insertions(+), 176 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 3a4e3d81fb..ceb6cd9739 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -158,7 +158,6 @@ typedef enum ECHECKPOINT_BACKUP_TYPE { ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskDownloadCheckpointData(char* id, char* path); - int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index f0f12cae2b..e6f127349c 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -162,6 +162,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, + SRpcHandleInfo* pRpcInfo, int32_t taskId) { + SEncoder encoder; + int32_t code; + int32_t len; + + tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code); + if (code < 0) { + stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId); + return -1; + } + + void* buf = rpcMallocCont(sizeof(SMsgHead) + len); + ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeStreamTaskCheckRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; + + tmsgSendRsp(&rspMsg); + return 0; +} + int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; taosThreadMutexLock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 0ac282c362..5596eb3dee 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -330,6 +330,36 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return 0; } +int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { + SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); + if (pTranstate == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pBlock == NULL) { + taosFreeQitem(pTranstate); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pTranstate->type = STREAM_INPUT__TRANS_STATE; + + pBlock->info.type = STREAM_TRANS_STATE; + pBlock->info.rows = 1; + pBlock->info.childId = pTask->info.selfChildId; + + pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock; + taosArrayPush(pTranstate->blocks, pBlock); + + taosMemoryFree(pBlock); + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pTask->status.appendTranstateBlock = true; + return TSDB_CODE_SUCCESS; +} + // the result should be put into the outputQ in any cases, the result may be lost otherwise. int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputq.queue->pQueue; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 2e337234b6..52e7431e70 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -17,7 +17,7 @@ #include "ttimer.h" static void streamTaskResumeHelper(void* param, void* tmrId); -static void streamSchedByTimer(void* param, void* tmrId); +static void streamTaskSchedHelper(void* param, void* tmrId); int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { @@ -26,7 +26,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); + pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.triggerParam, pTask, streamTimer); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } @@ -112,7 +112,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) { streamMetaReleaseTask(pTask->pMeta, pTask); } -void streamSchedByTimer(void* param, void* tmrId) { +void streamTaskSchedHelper(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; const char* id = pTask->id.idStr; int32_t nextTrigger = (int32_t)pTask->info.triggerParam; @@ -133,7 +133,7 @@ void streamSchedByTimer(void* param, void* tmrId) { if (pTrigger == NULL) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -144,7 +144,7 @@ void streamSchedByTimer(void* param, void* tmrId) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -153,7 +153,7 @@ void streamSchedByTimer(void* param, void* tmrId) { int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -161,5 +161,5 @@ void streamSchedByTimer(void* param, void* tmrId) { } } - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); } diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index a69cb75dad..1800324cc8 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -32,6 +32,7 @@ typedef struct SBackendFileItem { int64_t size; int8_t ref; } SBackendFileItem; + typedef struct SBackendFile { char* pCurrent; char* pMainfest; @@ -102,6 +103,7 @@ struct SStreamSnapWriter { int64_t ever; SStreamSnapHandle handle; }; + const char* ROCKSDB_OPTIONS = "OPTIONS"; const char* ROCKSDB_MAINFEST = "MANIFEST"; const char* ROCKSDB_SST = "sst"; @@ -120,7 +122,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle); } while (0) int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { - int ret = 0; + int32_t ret = 0; char* fullname = taosMemoryCalloc(1, strlen(path) + 32); sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); @@ -149,7 +151,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest); if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions); if (pSnapFile->pSst) { - for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { char* name = taosArrayGetP(pSnapFile->pSst, i); sprintf(buf + strlen(buf), "%s,", name); } @@ -157,7 +159,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { sprintf(buf + strlen(buf) - 1, "]"); stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, - pSnapFile->snapInfo.taskId, buf); + pSnapFile->snapInfo.taskId, buf); taosMemoryFree(buf); } } @@ -183,7 +185,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) { streamGetFileSize(pSnapFile->path, item.name, &item.size); taosArrayPush(pSnapFile->pFileList, &item); // sst - for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { char* sst = taosArrayGetP(pSnapFile->pSst, i); item.name = sst; item.type = ROCKSDB_SST_TYPE; @@ -270,12 +272,12 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { taosMemoryFree(pSnap->pMainfest); taosMemoryFree(pSnap->pOptions); taosMemoryFree(pSnap->path); - for (int i = 0; i < taosArrayGetSize(pSnap->pSst); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) { char* sst = taosArrayGetP(pSnap->pSst, i); taosMemoryFree(sst); } // unite read/write snap file - for (int i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) { SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i); if (pItem->ref == 0) { taosMemoryFree(pItem->name); @@ -297,7 +299,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); - for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); SBackendSnapFile2 snapFile = {0}; @@ -305,7 +307,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta ASSERT(code == 0); taosArrayPush(pDbSnapSet, &snapFile); } - for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); taosMemoryFree(pSnap->dbPrefixPath); } @@ -324,7 +326,7 @@ _err: void streamSnapHandleDestroy(SStreamSnapHandle* handle) { if (handle->pDbSnapSet) { - for (int i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) { + for (int32_t i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) { SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pDbSnapSet, i); snapFileDebugInfo(pSnapFile); snapFileDestroy(pSnapFile); @@ -396,9 +398,9 @@ _NEXT: item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 - ", file no.%d, total set:%d, current set idx: %d", - STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx, - (int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); + ", file no.%d, total set:%d, current set idx: %d", + STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx, + (int32_t)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); @@ -489,14 +491,10 @@ int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) { } int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, SBackendSnapFile2* pSnapFile) { - int code = -1; + int32_t code = -1; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapHandle* pHandle = &pWriter->handle; - SStreamTaskSnap snapInfo = pHdr->snapInfo; - - SStreamTaskSnap* pSnapInfo = &pSnapFile->snapInfo; - - SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); + SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); if (pSnapFile->fd == 0) { pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); @@ -540,6 +538,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t pSnapFile->offset += pHdr->size; } code = 0; + _EXIT: return code; } @@ -590,8 +589,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa return streamSnapWrite(pWriter, pData, nData); } } - return code; } + int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { if (pWriter == NULL) return 0; streamSnapHandleDestroy(&pWriter->handle); diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index b3df5755ea..98c0d95781 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -38,8 +38,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doExecScanhistoryInFuture(void* param, void* tmrId); static int32_t doStartScanHistoryTask(SStreamTask* pTask); static int32_t streamTaskStartScanHistory(SStreamTask* pTask); +static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask); +static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask); +static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now); +static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now); -int32_t streamTaskSetReady(SStreamTask* pTask) { +static int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); SStreamTaskState* p = streamTaskGetStatus(pTask); @@ -79,33 +83,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } -void doExecScanhistoryInFuture(void* param, void* tmrId) { - SStreamTask* pTask = param; - pTask->schedHistoryInfo.numOfTicks -= 1; - - SStreamTaskState* p = streamTaskGetStatus(pTask); - if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - if (pTask->schedHistoryInfo.numOfTicks <= 0) { - streamStartScanHistoryAsync(pTask, 0); - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, - pTask->info.fillHistory, ref); - - // release the task. - streamMetaReleaseTask(pTask->pMeta, pTask); - } else { - taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); - } -} - int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { @@ -136,17 +113,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) return TSDB_CODE_SUCCESS; } -int32_t doStartScanHistoryTask(SStreamTask* pTask) { - SVersionRange* pRange = &pTask->dataRange.range; - if (pTask->info.fillHistory) { - streamSetParamForScanHistory(pTask); - } - - streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); - int32_t code = streamStartScanHistoryAsync(pTask, 0); - return code; -} - int32_t streamTaskStartScanHistory(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; ETaskStatus status = streamTaskGetStatus(pTask)->state; @@ -267,32 +233,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, - SRpcHandleInfo* pRpcInfo, int32_t taskId) { - SEncoder encoder; - int32_t code; - int32_t len; - - tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code); - if (code < 0) { - stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId); - return -1; - } - - void* buf = rpcMallocCont(sizeof(SMsgHead) + len); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeStreamTaskCheckRsp(&encoder, pRsp); - tEncoderClear(&encoder); - - SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; - - tmsgSendRsp(&rspMsg); - return 0; -} - // common int32_t streamSetParamForScanHistory(SStreamTask* pTask) { stDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr); @@ -308,6 +248,55 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); } +// an fill history task needs to be started. +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + const char* idStr = pTask->id.idStr; + int64_t hStreamId = pTask->hTaskInfo.id.streamId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; + ASSERT(hTaskId != 0); + + // check stream task status in the first place. + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) { + stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, + pStatus->name); + + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + return -1; // todo set the correct error code + } + + stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); + + // Set the execute conditions, including the query time window and the version range + streamMetaRLock(pMeta); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + streamMetaRUnLock(pMeta); + + if (pHTask != NULL) { // it is already added into stream meta store. + SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); + if (pHisTask == NULL) { + stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + } else { + if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing + stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); + } else { // exist, but not ready, continue check downstream task status + checkFillhistoryTaskStatus(pTask, pHisTask); + } + + streamMetaReleaseTask(pMeta, pHisTask); + } + + return TSDB_CODE_SUCCESS; + } else { + return launchNotBuiltFillHistoryTask(pTask); + } +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { pReq->msgHead.vgId = pTask->info.nodeId; pReq->streamId = pTask->id.streamId; @@ -316,37 +305,7 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8 return 0; } -int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { - SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); - if (pTranstate == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (pBlock == NULL) { - taosFreeQitem(pTranstate); - return TSDB_CODE_OUT_OF_MEMORY; - } - - pTranstate->type = STREAM_INPUT__TRANS_STATE; - - pBlock->info.type = STREAM_TRANS_STATE; - pBlock->info.rows = 1; - pBlock->info.childId = pTask->info.selfChildId; - - pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock; - taosArrayPush(pTranstate->blocks, pBlock); - - taosMemoryFree(pBlock); - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pTask->status.appendTranstateBlock = true; - return TSDB_CODE_SUCCESS; -} - -static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { +void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { SDataRange* pRange = &pHTask->dataRange; // the query version range should be limited to the already processed data @@ -365,7 +324,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST); } -static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { +void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; @@ -379,7 +338,7 @@ static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p pHTaskInfo->id.streamId = 0; } -static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { +void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; @@ -401,7 +360,7 @@ static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p } } -static void tryLaunchHistoryTask(void* param, void* tmrId) { +void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; int64_t now = taosGetTimestampMs(); @@ -449,7 +408,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { - noRetryLaunchFillHistoryTask(pTask, pInfo, now); + notRetryLaunchFillHistoryTask(pTask, pInfo, now); } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. streamTaskSetRetryInfoForLaunch(pHTaskInfo); ASSERT(pTask->status.timerActive >= 1); @@ -500,7 +459,7 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, in return pInfo; } -static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { +int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; STaskExecStatisInfo* pExecInfo = &pTask->execInfo; const char* idStr = pTask->id.idStr; @@ -547,54 +506,6 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -// an fill history task needs to be started. -int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; - STaskExecStatisInfo* pExecInfo = &pTask->execInfo; - const char* idStr = pTask->id.idStr; - int64_t hStreamId = pTask->hTaskInfo.id.streamId; - int32_t hTaskId = pTask->hTaskInfo.id.taskId; - ASSERT(hTaskId != 0); - - // check stream task status in the first place. - SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) { - stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, - pStatus->name); - - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); - return -1; // todo set the correct error code - } - - stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); - - // Set the execute conditions, including the query time window and the version range - streamMetaRLock(pMeta); - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - streamMetaRUnLock(pMeta); - - if (pHTask != NULL) { // it is already added into stream meta store. - SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); - if (pHisTask == NULL) { - stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); - } else { - if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing - stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); - } else { // exist, but not ready, continue check downstream task status - checkFillhistoryTaskStatus(pTask, pHisTask); - } - - streamMetaReleaseTask(pMeta, pHisTask); - } - - return TSDB_CODE_SUCCESS; - } else { - return launchNotBuiltFillHistoryTask(pTask); - } -} - int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamInfoResetTimewindowFilter(exec); @@ -651,3 +562,41 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { streamSetParamForStreamScannerStep2(pTask, &verRange, &win); } } + +void doExecScanhistoryInFuture(void* param, void* tmrId) { + SStreamTask* pTask = param; + pTask->schedHistoryInfo.numOfTicks -= 1; + + SStreamTaskState* p = streamTaskGetStatus(pTask); + if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (pTask->schedHistoryInfo.numOfTicks <= 0) { + streamStartScanHistoryAsync(pTask, 0); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, + pTask->info.fillHistory, ref); + + // release the task. + streamMetaReleaseTask(pTask->pMeta, pTask); + } else { + taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); + } +} + +int32_t doStartScanHistoryTask(SStreamTask* pTask) { + SVersionRange* pRange = &pTask->dataRange.range; + if (pTask->info.fillHistory) { + streamSetParamForScanHistory(pTask); + } + + streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); + int32_t code = streamStartScanHistoryAsync(pTask, 0); + return code; +} From 0e0f98eb8de8f880a96ce708523dea2d5115dac2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 May 2024 10:50:44 +0800 Subject: [PATCH 2/3] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 9 ++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 44 +++---------------- source/libs/stream/src/streamCheckStatus.c | 51 ++++++++++++++++++++-- source/libs/stream/src/streamTaskSm.c | 2 +- 4 files changed, 57 insertions(+), 49 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1db62abfc0..02fa31ef07 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -819,9 +819,6 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); -// recover and fill history -void streamTaskCheckDownstream(SStreamTask* pTask); - int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); @@ -838,8 +835,10 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); int32_t streamTaskRestoreStatus(SStreamTask* pTask); -int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, - SRpcHandleInfo* pRpcInfo, int32_t taskId); +void streamTaskSendCheckMsg(SStreamTask* pTask); +void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp); +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, + int32_t taskId); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index be3023122f..4d78f6826c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -406,50 +406,16 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamTaskCheckReq req; - SDecoder decoder; + SStreamTaskCheckRsp rsp = {0}; + + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamTaskCheckReq(&decoder, &req); tDecoderClear(&decoder); - int32_t taskId = req.downstreamTaskId; - - SStreamTaskCheckRsp rsp = { - .reqId = req.reqId, - .streamId = req.streamId, - .childId = req.childId, - .downstreamNodeId = req.downstreamNodeId, - .downstreamTaskId = req.downstreamTaskId, - .upstreamNodeId = req.upstreamNodeId, - .upstreamTaskId = req.upstreamTaskId, - }; - - // only the leader node handle the check request - if (pMeta->role == NODE_ROLE_FOLLOWER) { - tqError( - "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", - taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); - rsp.status = TASK_DOWNSTREAM_NOT_LEADER; - } else { - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); - if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); - streamMetaReleaseTask(pMeta, pTask); - - SStreamTaskState* pState = streamTaskGetStatus(pTask); - tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 - ") task:0x%x (vgId:%d), check_status:%d", - pTask->id.idStr, pState->name, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, - rsp.status); - } else { - rsp.status = TASK_DOWNSTREAM_NOT_READY; - tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp check_status %d", - req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - } - } - - return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); + streamTaskProcessCheckMsg(pMeta, &req, &rsp); + return streamSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId); } int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index e6f127349c..9c3908c833 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -39,7 +39,7 @@ static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status -void streamTaskCheckDownstream(SStreamTask* pTask) { +void streamTaskSendCheckMsg(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; const char* idstr = pTask->id.idStr; @@ -97,6 +97,46 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { } } +void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) { + int32_t taskId = pReq->downstreamTaskId; + + *pRsp = (SStreamTaskCheckRsp){ + .reqId = pReq->reqId, + .streamId = pReq->streamId, + .childId = pReq->childId, + .downstreamNodeId = pReq->downstreamNodeId, + .downstreamTaskId = pReq->downstreamTaskId, + .upstreamNodeId = pReq->upstreamNodeId, + .upstreamTaskId = pReq->upstreamTaskId, + }; + + // only the leader node handle the check request + if (pMeta->role == NODE_ROLE_FOLLOWER) { + stError( + "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", + taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId); + pRsp->status = TASK_DOWNSTREAM_NOT_LEADER; + } else { + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); + if (pTask != NULL) { + pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage); + streamMetaReleaseTask(pMeta, pTask); + + SStreamTaskState* pState = streamTaskGetStatus(pTask); + stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 + ") task:0x%x (vgId:%d), check_status:%d", + pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, + pRsp->status); + } else { + pRsp->status = TASK_DOWNSTREAM_NOT_READY; + stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp check_status %d", + pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status); + } + } + +} + int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); @@ -152,7 +192,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs STaskId* pId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); } - } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms + } else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms ASSERT(left > 0); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); @@ -162,7 +202,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, int32_t taskId) { SEncoder encoder; int32_t code; @@ -175,7 +215,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* } void* buf = rpcMallocCont(sizeof(SMsgHead) + len); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); + ((SMsgHead*)buf)->vgId = htonl(vgId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, (uint8_t*)abuf, len); @@ -420,6 +460,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { .stage = pTask->pMeta->stage, }; + // update the reqId for the new check msg + p->reqId = tGenIdPI64(); + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c16598f84c..cced6a6b84 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -84,7 +84,7 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) { stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, pTask->execInfo.checkTs); - streamTaskCheckDownstream(pTask); + streamTaskSendCheckMsg(pTask); return 0; } From b6debd985bf4bf9efde286a93e1e21efcc730430 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 May 2024 16:33:08 +0800 Subject: [PATCH 3/3] fix(stream): always return success if repeatly recv checkpoint source msg. --- include/libs/stream/tstream.h | 5 ++-- include/libs/transport/trpc.h | 2 -- source/dnode/vnode/src/tq/tq.c | 24 ++++++++++++------- source/libs/stream/src/streamBackendRocksdb.c | 7 +++--- source/libs/stream/src/streamDispatch.c | 16 +++++++------ 5 files changed, 29 insertions(+), 25 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 02fa31ef07..1b4636120b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -923,10 +923,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg); int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt); -int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, - int8_t isSucceed); +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, - int8_t isSucceed); + int32_t setCode); SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); void* streamDestroyStateMachine(SStreamTaskSM* pSM); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 95f70c8ff3..c3b5beb3e3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -37,7 +37,6 @@ typedef struct { int64_t applyIndex; uint64_t applyTerm; char user[TSDB_USER_LEN]; - } SRpcConnInfo; typedef struct SRpcHandleInfo { @@ -63,7 +62,6 @@ typedef struct SRpcHandleInfo { SRpcConnInfo conn; int8_t forbiddenIp; int8_t notFreeAhandle; - } SRpcHandleInfo; typedef struct SRpcMsg { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3a3b103e3f..ed8d06080a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1064,6 +1064,7 @@ _OVER: return code; } +// no matter what kinds of error happened, make sure the mnode will receive the success execution code. int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -1081,8 +1082,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); + SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return code; } @@ -1091,7 +1093,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1101,7 +1103,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1112,7 +1114,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1128,7 +1130,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1146,7 +1148,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; @@ -1171,11 +1173,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } else { // checkpoint already finished, and not in checkpoint status if (req.checkpointId <= pTask->chkInfo.checkpointId) { tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 - " transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId); + " transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; } } @@ -1193,10 +1199,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus); } - code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); + code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return code; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 08118f5231..d97214b700 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1580,12 +1580,11 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { key.len = compressedSize; value = dst; } - stDebug("vlen: raw size: %d, compressed size: %d", vlen, compressedSize); } if (*dest == NULL) { - char* p = taosMemoryCalloc( - 1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len); + size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len; + char* p = taosMemoryCalloc(1, size); char* buf = p; len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key.len); @@ -1601,8 +1600,8 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { len += taosEncodeFixedI8((void**)&buf, key.compress); len += taosEncodeBinary((void**)&buf, (char*)value, key.len); } - taosMemoryFree(dst); + taosMemoryFree(dst); return len; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d56b347f4c..d0fc923f83 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -833,7 +833,7 @@ FAIL: } int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, - int8_t isSucceed) { + int32_t setCode) { int32_t len = 0; int32_t code = 0; SEncoder encoder; @@ -845,7 +845,7 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf .streamId = pReq->streamId, .expireTime = pReq->expireTime, .mnodeId = pReq->mnodeId, - .success = isSucceed, + .success = (setCode == TSDB_CODE_SUCCESS) ? 1 : 0, }; tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code); @@ -866,22 +866,24 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf tEncoderClear(&encoder); initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len); + + pMsg->code = setCode; pMsg->info = *pRpcInfo; return 0; } -int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, - int8_t isSucceed) { +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) { SStreamChkptReadyInfo info = {0}; - buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed); + buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS); if (pTask->pReadyMsgList == NULL) { pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); } taosArrayPush(pTask->pReadyMsgList, &info); - stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, - (int32_t)taosArrayGetSize(pTask->pReadyMsgList)); + + int32_t size = taosArrayGetSize(pTask->pReadyMsgList); + stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size); return TSDB_CODE_SUCCESS; }