diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 29dc054d78..e09ffb416c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -31,7 +31,7 @@ extern "C" { typedef struct SStreamTask SStreamTask; -#define SSTREAM_TASK_VER 3 +#define SSTREAM_TASK_VER 3 #define SSTREAM_TASK_INCOMPATIBLE_VER 1 #define SSTREAM_TASK_NEED_CONVERT_VER 2 @@ -382,6 +382,7 @@ struct SStreamTask { int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; SSHashObj* pNameMap; + void* pBackend; char reserve[256]; }; @@ -422,6 +423,8 @@ typedef struct SStreamMeta { int32_t chkpCap; SRWLatch chkpDirLock; int32_t pauseTaskNum; + + // SHashObj* pTaskBackend; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -722,6 +725,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); +void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3aeb679eb7..e136b9b265 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -582,9 +582,9 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosWLockLatch(&pTq->lock); bool exec = tqIsHandleExec(pHandle); - if(exec){ + if (exec) { tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId, - pHandle->subKey, pHandle); + pHandle->subKey, pHandle); taosWUnLockLatch(&pTq->lock); taosMsleep(10); continue; @@ -699,12 +699,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } ret = tqMetaSaveHandle(pTq, req.subKey, &handle); } else { - while(1){ + while (1) { taosWLockLatch(&pTq->lock); bool exec = tqIsHandleExec(pHandle); - if(exec){ - tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId, - pHandle->subKey, pHandle); + if (exec) { + tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", + pTq->pVnode->config.vgId, pHandle->subKey, pHandle); taosWUnLockLatch(&pTq->lock); taosMsleep(10); continue; @@ -713,7 +713,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, - req.newConsumerId); + req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_32(&pHandle->epoch, 0); tqUnregisterPushHandle(pTq, pHandle); @@ -736,6 +736,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); + + pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { return code; } @@ -859,8 +862,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } - tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", + tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->info.fillHistory, pTask->info.triggerParam); @@ -1004,7 +1007,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms bool restored = pTq->pVnode->restored; if (p != NULL && restored) { p->tsInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->tsInfo.init); + tqDebug("s-task:%s set the init ts:%" PRId64, p->id.idStr, p->tsInfo.init); streamTaskCheckDownstream(p); } else if (!restored) { @@ -1443,7 +1446,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, return 0; } - int8_t status = pTask->status.taskStatus; + int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { // no lock needs to secure the access of the version if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { @@ -1622,12 +1625,13 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. if (pTask->status.downstreamReady != 1) { qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 - ", set it failure", pTask->id.idStr, req.checkpointId); + ", set it failure", + pTask->id.idStr, req.checkpointId); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); - tmsgSendRsp(&rsp); // error occurs + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1784,7 +1788,6 @@ _end: tDecoderClear(&decoder); if (allStopped) { - if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); } else { @@ -1816,4 +1819,3 @@ _end: return rsp.code; } - diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index aa29ea8159..e516678941 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -220,6 +220,17 @@ int32_t streamMetaMayDoStateBackendConvert(SStreamMeta* pMeta) { return 0; } +void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) { + void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key)); + if (ppBackend != NULL && *ppBackend != NULL) { + // add ref later + return *ppBackend; + } + void* pBackend = streamStateOpenTaskBackend(pMeta->path, key); + taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); + + return pBackend; +} SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -277,11 +288,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->hbInfo.tickCounter = 0; pMeta->hbInfo.stopFlag = 0; + pMeta->pTaskBackendUnique = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + // start backend // taosInitRWLatch(&pMeta->chkpDirLock); - // pMeta->pTaskBackendUnique = - // taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); // pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t)); // pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t)); // pMeta->chkpCap = 8; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 837f787dd2..9a76f661f8 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -388,6 +388,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); + //pTask->pBackend = streamStateOpenTaskBackend(pMeta->path, (char*)pTask->id.idStr); + return TSDB_CODE_SUCCESS; }