diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 04694b05fd..a0cff35623 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -801,7 +801,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ int64_t* oldStage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); -bool streamTaskAllUpstreamClosed(SStreamTask* pTask); +bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); @@ -826,8 +826,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); -int32_t streamResetParamForScanHistory(SStreamTask* pTask); -void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); +void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask); void streamTaskResume(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); @@ -838,6 +837,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); +bool streamTaskIsSinkTask(const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); @@ -885,6 +885,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +bool streamMetaAllTasksReady(const SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/include/os/osFile.h b/include/os/osFile.h index 503535a454..4d760a791f 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -117,6 +117,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName); int32_t taosSetFileHandlesLimit(); +int32_t taosLinkFile(char *src, char *dst); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a53b322753..1fc7402363 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -742,8 +742,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { return code; } - streamTaskOpenAllUpstreamInput(pTask); - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { STaskId taskId = {0}; if (pTask->info.fillHistory) { @@ -1126,6 +1124,19 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) pRsp->info.handle = NULL; SStreamCheckpointSourceReq req = {0}; + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs + return code; + } + tDecoderClear(&decoder); + if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; @@ -1142,19 +1153,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); - SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); - tmsgSendRsp(&rsp); // error occurs - return code; - } - tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 5a92677462..e2dc9fa679 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -76,33 +76,6 @@ int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return 0; } -int32_t tqUpdateNodeEpsetAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { - int32_t vgId = pMeta->vgId; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr()); - return -1; - } - - tqDebug("vgId:%d update s-task:0x%x nodeEpset async", vgId, taskId); - pRunReq->head.vgId = vgId; - pRunReq->streamId = streamId; - pRunReq->taskId = taskId; - pRunReq->reqType = STREAM_EXEC_T_UPDATE_TASK_EPSET; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(cb, STREAM_QUEUE, &msg); - return 0; -} - int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -728,10 +701,6 @@ int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) { STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); streamTaskResetStatus(*pTask); - -// if ((*pTask)->info.fillHistory == 1) { -// streamResetParamForScanHistory(*pTask); -// } } return 0; @@ -844,8 +813,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead if (pTask != NULL) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { - tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); + tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, + p, pTask->chkInfo.nextProcessVer); streamExecTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInactive(pTask); @@ -864,23 +833,31 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { STaskStartInfo* pStartInfo = &pMeta->startInfo; - int32_t vgId = pMeta->vgId; + int32_t vgId = pMeta->vgId; streamMetaWLock(pMeta); if (pStartInfo->taskStarting == 1) { tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId, pMeta->startInfo.restartCount); } else { // not in starting procedure - if (pStartInfo->restartCount > 0) { + bool allReady = streamMetaAllTasksReady(pMeta); + + if ((pStartInfo->restartCount > 0) && (!allReady)) { + // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount pStartInfo->restartCount -= 1; tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role, pStartInfo->restartCount); - streamMetaWUnLock(pMeta); + restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER)); return TSDB_CODE_SUCCESS; } else { - tqDebug("vgId:%d start all tasks completed in callbackFn", pMeta->vgId); + if (pStartInfo->restartCount == 0) { + tqDebug("vgId:%d start all tasks completed in callbackFn, restartCount is 0", pMeta->vgId); + } else if (allReady) { + pStartInfo->restartCount = 0; + tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId); + } } } @@ -922,7 +899,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){ } tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); - streamTaskPause(pTask, pMeta); + streamTaskPause(pMeta, pTask); SStreamTask* pHistoryTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -939,7 +916,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){ tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - streamTaskPause(pHistoryTask, pMeta); + streamTaskPause(pMeta, pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 1d1eda74f6..1e4b8996b6 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -120,7 +120,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); void streamTaskSetCheckpointFailedId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); -STaskId streamTaskExtractKey(const SStreamTask* pTask); +STaskId streamTaskGetTaskId(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7d507a1394..75af5a9fc5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -300,28 +300,6 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } -void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { - int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); - if (num == 0) { - return; - } - - for (int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - pInfo->dataAllowed = true; - } - - pTask->upstreamInfo.numOfClosed = 0; - stDebug("s-task:%s opening up inputQ from upstream tasks", pTask->id.idStr); -} - -void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); - if (pInfo != NULL) { - pInfo->dataAllowed = false; - } -} - SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < num; ++i) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4b2a9e4a65..c8f944071f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -41,6 +41,8 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst); int32_t getCfIdx(const char* cfName); STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath); +int32_t backendCopyFiles(char* src, char* dst); + void destroyCompactFilteFactory(void* arg); void destroyCompactFilte(void* arg); const char* compactFilteFactoryName(void* arg); @@ -218,7 +220,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { taosRemoveDir(state); } taosMkDir(state); - code = copyFiles(chkp, state); + code = backendCopyFiles(chkp, state); stInfo("copy snap file from %s to %s", chkp, state); if (code != 0) { stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); @@ -334,7 +336,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c if (code != 0) { return code; } - code = copyFiles(chkpPath, defaultPath); + code = backendCopyFiles(chkpPath, defaultPath); return code; } @@ -359,7 +361,7 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char if (code == 0) { taosMkDir(defaultPath); - code = copyFiles(chkpPath, defaultPath); + code = backendCopyFiles(chkpPath, defaultPath); } if (code != 0) { @@ -382,6 +384,121 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d return -1; } +int32_t copyFiles_create(char* src, char* dst, int8_t type) { + // create and copy file + int32_t err = taosCopyFile(src, dst); + + if (errno == EXDEV || errno == ENOTSUP) { + errno = 0; + return 0; + } + return 0; +} +int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) { + // same fs and hard link + return taosLinkFile(src, dst); +} + +int32_t backendFileCopyFilesImpl(char* src, char* dst) { + const char* current = "CURRENT"; + size_t currLen = strlen(current); + + int32_t code = 0; + int32_t sLen = strlen(src); + int32_t dLen = strlen(dst); + char* srcName = taosMemoryCalloc(1, sLen + 64); + char* dstName = taosMemoryCalloc(1, dLen + 64); + // copy file to dst + + TdDirPtr pDir = taosOpenDir(src); + if (pDir == NULL) { + taosMemoryFree(srcName); + taosMemoryFree(dstName); + errno = 0; + return -1; + } + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); + sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); + + if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) { + code = copyFiles_create(srcName, dstName, 0); + if (code != 0) { + stError("failed to copy file, detail: %s to %s reason: %s", srcName, dstName, + tstrerror(TAOS_SYSTEM_ERROR(code))); + goto _ERROR; + } + } else { + code = copyFiles_hardlink(srcName, dstName, 0); + if (code != 0) { + stError("failed to hard line file, detail: %s to %s, reason: %s", srcName, dstName, + tstrerror(TAOS_SYSTEM_ERROR(code))); + goto _ERROR; + } + } + memset(srcName, 0, sLen + 64); + memset(dstName, 0, dLen + 64); + } + + taosMemoryFreeClear(srcName); + taosMemoryFreeClear(dstName); + taosCloseDir(&pDir); + errno = 0; + return 0; +_ERROR: + taosMemoryFreeClear(srcName); + taosMemoryFreeClear(dstName); + taosCloseDir(&pDir); + errno = 0; + return -1; +} +int32_t backendCopyFiles(char* src, char* dst) { + return backendFileCopyFilesImpl(src, dst); + // // opt later, just hard link + // int32_t sLen = strlen(src); + // int32_t dLen = strlen(dst); + // char* srcName = taosMemoryCalloc(1, sLen + 64); + // char* dstName = taosMemoryCalloc(1, dLen + 64); + + // TdDirPtr pDir = taosOpenDir(src); + // if (pDir == NULL) { + // taosMemoryFree(srcName); + // taosMemoryFree(dstName); + // return -1; + // } + + // TdDirEntryPtr de = NULL; + // while ((de = taosReadDir(pDir)) != NULL) { + // char* name = taosGetDirEntryName(de); + // if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + // sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); + // sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); + // // if (!taosDirEntryIsDir(de)) { + // // // code = taosCopyFile(srcName, dstName); + // // if (code == -1) { + // // goto _err; + // // } + // // } + // return backendFileCopyFilesImpl(src, dst); + + // memset(srcName, 0, sLen + 64); + // memset(dstName, 0, dLen + 64); + // } + + // _err: + // taosMemoryFreeClear(srcName); + // taosMemoryFreeClear(dstName); + // taosCloseDir(&pDir); + // return code >= 0 ? 0 : -1; + + // return 0; +} int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t code = -1; int32_t len = strlen(defaultPath) + 32; @@ -396,7 +513,7 @@ int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* de taosRemoveDir(tmp); } taosMkDir(defaultPath); - code = copyFiles(chkpPath, defaultPath); + code = backendCopyFiles(chkpPath, defaultPath); if (code != 0) { stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); } else { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cb3f7a3504..006202fdd1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -185,6 +185,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int64_t checkpointId = pDataBlock->info.version; const char* id = pTask->id.idStr; int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pTask->pMeta->vgId; + + stDebug("s-task:%s vgId:%d start to handle the checkpoint block, checkpointId:%" PRId64 " ver:%" PRId64 + ", current checkpointingId:%" PRId64, + id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, checkpointId); // set task status if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) { @@ -330,7 +335,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); // save the task if not sink task - if (p->info.taskLevel < TASK_LEVEL__SINK) { + if (p->info.taskLevel <= TASK_LEVEL__SINK) { streamMetaWLock(pMeta); code = streamMetaSaveTask(pMeta, p); @@ -455,7 +460,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // sink task do not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - stDebug("s-task:%s level:%d start gen checkpoint", id, pTask->info.taskLevel); + stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); code = streamBackendDoCheckpoint(pTask->pBackend, ckId); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c69fe2a79c..1eb66a82ab 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -713,7 +713,13 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { *pStatus = pState->name; } - return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK); + // pause & halt will still run for sink tasks. + if (streamTaskIsSinkTask(pTask)) { + return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK || + st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT); + } else { + return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK); + } } static void doStreamExecTaskHelper(void* param, void* tmrId) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ee2b70eebe..1f45e9b94c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -47,6 +47,12 @@ struct SMetaHbInfo { int64_t hbStart; }; +typedef struct STaskInitTs { + int64_t start; + int64_t end; + bool success; +} STaskInitTs; + SMetaRefMgt gMetaRefMgt; void metaRefMgtInit(); @@ -581,7 +587,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { *pAdded = false; - STaskId id = streamTaskExtractKey(pTask); + STaskId id = streamTaskGetTaskId(pTask); void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p != NULL) { return 0; @@ -871,7 +877,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask); - STaskId id = streamTaskExtractKey(pTask); + STaskId id = streamTaskGetTaskId(pTask); taosArrayPush(pRecycleList, &id); int32_t total = taosArrayGetSize(pRecycleList); @@ -1302,28 +1308,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaRLock(SStreamMeta* pMeta) { - stTrace("vgId:%d meta-rlock", pMeta->vgId); +// stTrace("vgId:%d meta-rlock", pMeta->vgId); taosThreadRwlockRdlock(&pMeta->lock); } void streamMetaRUnLock(SStreamMeta* pMeta) { - stTrace("vgId:%d meta-runlock", pMeta->vgId); +// stTrace("vgId:%d meta-runlock", pMeta->vgId); int32_t code = taosThreadRwlockUnlock(&pMeta->lock); if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); } else { - stDebug("vgId:%d meta-runlock completed", pMeta->vgId); +// stDebug("vgId:%d meta-runlock completed", pMeta->vgId); } } void streamMetaWLock(SStreamMeta* pMeta) { - stTrace("vgId:%d meta-wlock", pMeta->vgId); +// stTrace("vgId:%d meta-wlock", pMeta->vgId); taosThreadRwlockWrlock(&pMeta->lock); - stTrace("vgId:%d meta-wlock completed", pMeta->vgId); +// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { - stTrace("vgId:%d meta-wunlock", pMeta->vgId); +// stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosThreadRwlockUnlock(&pMeta->lock); } @@ -1407,37 +1413,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) } } -int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { - streamMetaRLock(pMeta); - - int32_t num = taosArrayGetSize(pMeta->pTaskList); - stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); - if (num == 0) { - streamMetaRUnLock(pMeta); - return TSDB_CODE_SUCCESS; - } - - // send hb msg to mnode before closing all tasks. - SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); - int32_t numOfTasks = taosArrayGetSize(pTaskList); - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId); - if (pTask == NULL) { - continue; - } - - streamTaskStop(pTask); - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - - streamMetaRUnLock(pMeta); - return 0; -} - int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pMeta->vgId; @@ -1512,6 +1487,54 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { return code; } +int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { + streamMetaRLock(pMeta); + + int32_t num = taosArrayGetSize(pMeta->pTaskList); + stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); + if (num == 0) { + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; + } + + // send hb msg to mnode before closing all tasks. + SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); + int32_t numOfTasks = taosArrayGetSize(pTaskList); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + streamTaskStop(pTask); + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + + streamMetaRUnLock(pMeta); + return 0; +} + +bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { + int32_t num = taosArrayGetSize(pMeta->pTaskList); + for(int32_t i = 0; i < num; ++i) { + STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId)); + if (ppTask == NULL) { + continue; + } + + if ((*ppTask)->status.downstreamReady == 0) { + return false; + } + } + + return true; +} + int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t vgId = pMeta->vgId; stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId); @@ -1547,4 +1570,74 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas streamMetaReleaseTask(pMeta, pTask); return ret; +} + +static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { + int32_t vgId = pMeta->vgId; + void* pIter = NULL; + size_t keyLen = 0; + + stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet), + succ ? "success" : "failed"); + + while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { + STaskInitTs* pInfo = pIter; + void* key = taosHashGetKey(pIter, &keyLen); + + SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); + if (pTask1 == NULL) { + stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); + } else { + stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, + (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + } + } +} + +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; + + streamMetaWLock(pMeta); + + if (pStartInfo->taskStarting != 1) { + int64_t el = endTs - startTs; + qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", + pMeta->vgId, taskId, ready, el); + streamMetaWUnLock(pMeta); + return 0; + } + + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; + + STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; + taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); + + int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); + + if (numOfRecv == numOfTotal) { + pStartInfo->readyTs = taosGetTimestampMs(); + pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; + + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 + ", readyTs:%" PRId64 " total elapsed time:%.2fs", + pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, + pStartInfo->elapsedTime / 1000.0); + + // print the initialization elapsed time and info + displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); + displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); + streamMetaResetStartInfo(pStartInfo); + streamMetaWUnLock(pMeta); + + pStartInfo->completeFn(pMeta); + } else { + streamMetaWUnLock(pMeta); + stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId, + ready, numOfRecv, numOfTotal); + } + + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 90823cd937..78929c365e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -154,7 +154,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *numOfBlocks = 0; *blockSize = 0; - // todo remove it // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 84f1dbb4d7..5e1566c1e1 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -35,12 +35,6 @@ typedef struct STaskRecheckInfo { void* checkTimer; } STaskRecheckInfo; -typedef struct STaskInitTs { - int64_t start; - int64_t end; - bool success; -} STaskInitTs; - static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); @@ -546,15 +540,6 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) { return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); } -int32_t streamResetParamForScanHistory(SStreamTask* pTask) { - stDebug("s-task:%s reset operator option for scan-history data", pTask->id.idStr); - if (pTask->exec.pExecutor != NULL) { - return qResetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); - } else { - return TSDB_CODE_SUCCESS; - } -} - int32_t streamRestoreParam(SStreamTask* pTask) { stDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr); return qRestoreStreamOperatorOption(pTask->exec.pExecutor); @@ -1134,97 +1119,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } } -void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE); - - int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); - - // in case of fill-history task, stop the tsdb file scan operation. - if (pTask->info.fillHistory == 1) { - void* pExecutor = pTask->exec.pExecutor; - qKillTask(pExecutor, TSDB_CODE_SUCCESS); - } - - stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); -} - -void streamTaskResume(SStreamTask* pTask) { - SStreamTaskState prevState = *streamTaskGetStatus(pTask); - SStreamMeta* pMeta = pTask->pMeta; - - if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) { - streamTaskRestoreStatus(pTask); - - char* pNew = streamTaskGetStatus(pTask)->name; - if (prevState.state == TASK_STATUS__PAUSE) { - int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); - } else { - stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name); - } - } else { - stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name); - } -} - -static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { - int32_t vgId = pMeta->vgId; - void* pIter = NULL; - size_t keyLen = 0; - - stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet), - succ ? "success" : "failed"); - - while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { - STaskInitTs* pInfo = pIter; - void* key = taosHashGetKey(pIter, &keyLen); - - SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); - if (pTask1 == NULL) { - stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); - } else { - stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, - (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); - } - } -} - -int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, - int64_t endTs, bool ready) { - STaskStartInfo* pStartInfo = &pMeta->startInfo; - STaskId id = {.streamId = streamId, .taskId = taskId}; - - streamMetaWLock(pMeta); - SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; - - STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; - taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); - - int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - - if (numOfRecv == numOfTotal) { - pStartInfo->readyTs = taosGetTimestampMs(); - pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 - ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, - pStartInfo->elapsedTime / 1000.0); - - // print the initialization elapsed time and info - displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); - displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo); - streamMetaWUnLock(pMeta); - - pStartInfo->completeFn(pMeta); - } else { - streamMetaWUnLock(pMeta); - stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId, - ready, numOfRecv, numOfTotal); - } - - return TSDB_CODE_SUCCESS; -} diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 776a9db522..19b7359981 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -435,7 +435,6 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) { // todo refactor - stDebug("streamStateReleaseBuf"); if (!pVal) { return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c14d355dc8..2497e8dd8e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -30,6 +30,55 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { return 0; } +static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { + char buf[512] = {0}; + + if (pTask->info.nodeId == nodeId) { // execution task should be moved away + epsetAssign(&pTask->info.epSet, pEpSet); + EPSET_TO_STR(pEpSet, buf) + stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); + } + + // check for the dispath info and the upstream task info + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SOURCE) { + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else if (level == TASK_LEVEL__AGG) { + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else { // TASK_LEVEL__SINK + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + } + + return 0; +} + +static void freeItem(void* p) { + SStreamContinueExecInfo* pInfo = p; + rpcFreeCont(pInfo->msg.pCont); +} + +static void freeUpstreamItem(void* p) { + SStreamChildEpInfo** pInfo = p; + taosMemoryFree(*pInfo); +} + +static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { + SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); + if (pEpInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pEpInfo->childId = pTask->info.selfChildId; + pEpInfo->epSet = pTask->info.epSet; + pEpInfo->nodeId = pTask->info.nodeId; + pEpInfo->taskId = pTask->id.taskId; + pEpInfo->stage = -1; + + return pEpInfo; +} + SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); @@ -291,16 +340,6 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { return 0; } -static void freeItem(void* p) { - SStreamContinueExecInfo* pInfo = p; - rpcFreeCont(pInfo->msg.pCont); -} - -static void freeUpstreamItem(void* p) { - SStreamChildEpInfo** pInfo = p; - taosMemoryFree(*pInfo); -} - void tFreeStreamTask(SStreamTask* pTask) { char* p = NULL; int32_t taskId = pTask->id.taskId; @@ -475,14 +514,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } taosThreadMutexInit(&pTask->lock, &attr); - // if (pTask->info.fillHistory == 1) { - // // - // } else { - - // } - // if (streamTaskSetDb(pMeta, pTask) != 0) { - // return -1; - // } streamTaskOpenAllUpstreamInput(pTask); pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); @@ -509,22 +540,6 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { } } -static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { - SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); - if (pEpInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pEpInfo->childId = pTask->info.selfChildId; - pEpInfo->epSet = pTask->info.epSet; - pEpInfo->nodeId = pTask->info.nodeId; - pEpInfo->taskId = pTask->id.taskId; - pEpInfo->stage = -1; - - return pEpInfo; -} - int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) { SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask); if (pEpInfo == NULL) { @@ -622,29 +637,6 @@ int32_t streamTaskStop(SStreamTask* pTask) { return 0; } -int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { - char buf[512] = {0}; - - if (pTask->info.nodeId == nodeId) { // execution task should be moved away - epsetAssign(&pTask->info.epSet, pEpSet); - EPSET_TO_STR(pEpSet, buf) - stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); - } - - // check for the dispath info and the upstream task info - int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SOURCE) { - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); - } else if (level == TASK_LEVEL__AGG) { - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); - } else { // TASK_LEVEL__SINK - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - } - - return 0; -} - int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { STaskExecStatisInfo* p = &pTask->execInfo; @@ -677,7 +669,29 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); } -bool streamTaskAllUpstreamClosed(SStreamTask* pTask) { +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { + int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); + if (num == 0) { + return; + } + + for (int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + pInfo->dataAllowed = true; + } + + pTask->upstreamInfo.numOfClosed = 0; + stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num); +} + +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); + if (pInfo != NULL) { + pInfo->dataAllowed = false; + } +} + +bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) { return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList); } @@ -760,7 +774,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } -STaskId streamTaskExtractKey(const SStreamTask* pTask) { +STaskId streamTaskGetTaskId(const SStreamTask* pTask) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; return id; } @@ -801,3 +815,40 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->chkpointTransId = pSrc->chkpointTransId; } +void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE); + + int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + + // in case of fill-history task, stop the tsdb file scan operation. + if (pTask->info.fillHistory == 1) { + void* pExecutor = pTask->exec.pExecutor; + qKillTask(pExecutor, TSDB_CODE_SUCCESS); + } + + stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); +} + +void streamTaskResume(SStreamTask* pTask) { + SStreamTaskState prevState = *streamTaskGetStatus(pTask); + SStreamMeta* pMeta = pTask->pMeta; + + if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) { + streamTaskRestoreStatus(pTask); + + char* pNew = streamTaskGetStatus(pTask)->name; + if (prevState.state == TASK_STATUS__PAUSE) { + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num); + } else { + stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name); + } + } else { + stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name); + } +} + +bool streamTaskIsSinkTask(const SStreamTask* pTask) { + return pTask->info.taskLevel == TASK_LEVEL__SINK; +} diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 39de117339..f4e35c5b7f 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -18,8 +18,8 @@ #include "zlib.h" #ifdef WINDOWS -#include #include +#include #include #include #define F_OK 0 @@ -50,7 +50,7 @@ typedef struct TdFile { TdThreadRwlock rwlock; int refId; HANDLE hFile; - FILE* fp; + FILE *fp; int32_t tdFileOptions; } TdFile; #else @@ -230,7 +230,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *a int32_t code = _stati64(path, &fileStat); #else struct stat fileStat; - int32_t code = stat(path, &fileStat); + int32_t code = stat(path, &fileStat); #endif if (code < 0) { return code; @@ -274,7 +274,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) { return -1; } struct stat fileStat; - int32_t code = fstat(pFile->fd, &fileStat); + int32_t code = fstat(pFile->fd, &fileStat); if (code < 0) { printf("taosFStatFile run fstat fail."); return code; @@ -374,7 +374,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { DWORD bytesRead; if (!ReadFile(pFile->hFile, buf, count, &bytesRead, NULL)) { bytesRead = -1; - } + } #if FILE_WITH_LOCK taosThreadRwlockUnlock(&(pFile->rwlock)); #endif @@ -389,7 +389,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { taosThreadRwlockWrlock(&(pFile->rwlock)); #endif - DWORD bytesWritten; + DWORD bytesWritten; if (!WriteFile(pFile->hFile, buf, count, &bytesWritten, NULL)) { bytesWritten = -1; } @@ -666,7 +666,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { } int64_t leftbytes = count; int64_t readbytes; - char * tbuf = (char *)buf; + char *tbuf = (char *)buf; while (leftbytes > 0) { #ifdef WINDOWS @@ -716,7 +716,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { int64_t nleft = count; int64_t nwritten = 0; - char * tbuf = (char *)buf; + char *tbuf = (char *)buf; while (nleft > 0) { nwritten = write(pFile->fd, (void *)tbuf, (uint32_t)nleft); @@ -1028,7 +1028,7 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in #endif } -#endif // WINDOWS +#endif // WINDOWS TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { FILE *fp = NULL; @@ -1056,7 +1056,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { if (hFile != NULL) CloseHandle(hFile); #else if (fd >= 0) close(fd); -#endif +#endif if (fp != NULL) fclose(fp); return NULL; } @@ -1067,7 +1067,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { pFile->fp = fp; pFile->refId = 0; - #ifdef WINDOWS +#ifdef WINDOWS pFile->hFile = hFile; pFile->tdFileOptions = tdFileOptions; // do nothing, since the property of pmode is set with _O_TEMPORARY; the OS will recycle @@ -1137,7 +1137,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset) #endif return -1; } - DWORD ret = 0; + DWORD ret = 0; OVERLAPPED ol = {0}; ol.OffsetHigh = (uint32_t)((offset & 0xFFFFFFFF00000000LL) >> 0x20); ol.Offset = (uint32_t)(offset & 0xFFFFFFFFLL); @@ -1179,7 +1179,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) { if (pFile->hFile != NULL) { if (pFile->tdFileOptions & TD_FILE_WRITE_THROUGH) { return 0; - } + } return !FlushFileBuffers(pFile->hFile); #else if (pFile->fd >= 0) { @@ -1204,7 +1204,7 @@ bool taosValidFile(TdFilePtr pFile) { return pFile != NULL && pFile->hFile != NULL; #else return pFile != NULL && pFile->fd > 0; -#endif +#endif } int32_t taosUmaskFile(int32_t maskVal) { @@ -1249,7 +1249,7 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) { } bufferSize += 512; - void* newBuf = taosMemoryRealloc(*ptrBuf, bufferSize); + void *newBuf = taosMemoryRealloc(*ptrBuf, bufferSize); if (newBuf == NULL) { taosMemoryFreeClear(*ptrBuf); return -1; @@ -1363,7 +1363,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { cmp_end: if (pFile) { - taosCloseFile(&pFile); + taosCloseFile(&pFile); } if (pSrcFile) { taosCloseFile(&pSrcFile); @@ -1386,3 +1386,15 @@ int32_t taosSetFileHandlesLimit() { #endif return 0; } + +int32_t taosLinkFile(char *src, char *dst) { +#ifndef WINDOWS + if (link(src, dst) != 0) { + if (errno == EXDEV || errno == ENOTSUP) { + return -1; + } + return errno; + } +#endif + return 0; +}