From db0f6258fe8ba2fdbad27a15861fcd7c69221f5e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 Oct 2023 19:38:22 +0800 Subject: [PATCH] fix stream case error --- include/libs/stream/tstream.h | 4 ++-- source/dnode/vnode/src/sma/smaRollup.c | 10 ++++++---- source/dnode/vnode/src/tq/tq.c | 20 ++++++++----------- source/libs/stream/src/streamMeta.c | 4 ++-- source/libs/stream/src/streamState.c | 27 +++----------------------- source/libs/stream/src/streamTask.c | 11 ++++++++--- 6 files changed, 29 insertions(+), 47 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 713a457d52..dc24106575 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -774,7 +774,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); -int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask); +int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char *key); void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); @@ -794,4 +794,4 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf } #endif -#endif /* ifndef _STREAM_H_ */ +#endif /* ifndef _STREAM_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8da2fff5a6..fe458fdba3 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -256,9 +256,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat taosMemoryFree(s); } - SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value - task.pMeta = pVnode->pTq->pStreamMeta; - pStreamState = streamStateOpen(taskInfDir, &task, true, -1, -1); + //SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value + SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + pTask->pMeta = pVnode->pTq->pStreamMeta; + + pStreamState = streamStateOpen(taskInfDir, pTask, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; return TSDB_CODE_FAILED; @@ -1429,4 +1431,4 @@ _exit: smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } return code; -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c18a58716c..d028989865 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -749,14 +749,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SStreamTask* pStateTask = pTask; - // SStreamTask task = {0}; - // if (pTask->info.fillHistory) { - // task.id.streamId = pTask->streamTaskId.streamId; - // task.id.taskId = pTask->streamTaskId.taskId; - // task.pMeta = pTask->pMeta; - // pStateTask = &task; - // } + // if (pTask->info.fillHistory) { + // pTask->id.streamId = pTask->streamTaskId.streamId; + // pTask->id.taskId = pTask->streamTaskId.taskId; + // } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId); @@ -786,10 +784,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { SStreamTask* pSateTask = pTask; // SStreamTask task = {0}; // if (pTask->info.fillHistory) { - // task.id.streamId = pTask->streamTaskId.streamId; - // task.id.taskId = pTask->streamTaskId.taskId; - // task.pMeta = pTask->pMeta; - // pSateTask = &task; + // pTask->id.streamId = pTask->streamTaskId.streamId; + // pTask->id.taskId = pTask->streamTaskId.taskId; // } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -1984,4 +1980,4 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; -} +} \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9ae4c2e043..7c616cba5b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -226,10 +226,10 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { return 0; } -int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg) { +int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char *key) { SStreamTask* pTask = arg; - char* key = (char*)pTask->id.idStr; + //char* key = (char*)pTask->id.idStr; int64_t chkpId = pTask->checkpointingId; taosThreadMutexLock(&pMeta->backendMutex); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 32672d7b22..a6809b11dd 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -110,31 +110,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz pState->streamId = pStreamTask->id.streamId; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); + streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr); + #ifdef USE_ROCKSDB SStreamMeta* pMeta = pStreamTask->pMeta; - // pState->streamBackendRid = pMeta->streamBackendRid; - // taosWLockLatch(&pMeta->lock); - // taosThreadMutexLock(&pMeta->backendMutex); - // void* uniqueId = - // taosHashGet(pMeta->pTaskDbUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); - // if (uniqueId == NULL) { - // int code = streamStateOpenBackend(pMeta->streamBackend, pState); - // if (code == -1) { - // taosThreadMutexUnlock(&pMeta->backendMutex); - // taosMemoryFree(pState); - // return NULL; - // } - // taosHashPut(pMeta->pTaskDbUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1, - // &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId)); - // } else { - // int64_t id = *(int64_t*)uniqueId; - // pState->pTdbState->backendCfWrapperId = id; - // pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id); - // // already exist stream task for - // qInfo("already exist stream-state for %s", pState->pTdbState->idstr); - // // taosAcquireRef(streamBackendId, pState->streamBackendRid); - // } - // taosThreadMutexUnlock(&pMeta->backendMutex); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; @@ -1219,4 +1198,4 @@ char* streamStateIntervalDump(SStreamState* pState) { streamStateFreeCur(pCur); return dumpBuf; } -#endif +#endif \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 15f3ba4a94..6d7a03bc6c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -443,9 +443,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } taosThreadMutexInit(&pTask->lock, &attr); - if (streamTaskSetDb(pMeta, pTask) != 0) { - return -1; - } + // if (pTask->info.fillHistory == 1) { + // // + // } else { + + // } + // if (streamTaskSetDb(pMeta, pTask) != 0) { + // return -1; + // } streamTaskOpenAllUpstreamInput(pTask); return TSDB_CODE_SUCCESS;