From d4f87378fd491895cfe2ede6bb01dc853e51f7d4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 27 Sep 2023 14:44:09 +0800 Subject: [PATCH] avoid first tag index conflict --- source/dnode/snode/src/snode.c | 52 ++++++++++++---------- source/libs/stream/src/streamMeta.c | 4 +- source/libs/stream/src/streamState.c | 64 ++++++++++++++-------------- 3 files changed, 64 insertions(+), 56 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2b1885fb0e..2dba51c150 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -50,7 +50,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { FAIL: if (pMsg->info.handle == NULL) return; - SRpcMsg rsp = { .code = code, .info = pMsg->info}; + SRpcMsg rsp = {.code = code, .info = pMsg->info}; tmsgSendRsp(&rsp); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -62,6 +62,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { if (code != TSDB_CODE_SUCCESS) { return code; } + pTask->pBackend = NULL; + ASSERT(0); streamTaskOpenAllUpstreamInput(pTask); @@ -73,8 +75,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } - int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList); - SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; + int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList); + SReadHandle handle = {.vnode = NULL, + .numOfVgroups = numOfChildEp, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory}; initStreamStateAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId); @@ -83,23 +88,23 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); - SCheckpointInfo* pChkInfo = &pTask->chkInfo; + SCheckpointInfo *pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; - qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, - pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); + qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, + pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } else { if (pTask->chkInfo.nextProcessVer == -1) { pTask->chkInfo.nextProcessVer = 0; } } - qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", + qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->info.triggerParam); + pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->info.fillHistory, pTask->info.triggerParam); return 0; } @@ -179,8 +184,8 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); taosWUnLockLatch(&pSnode->pMeta->lock); - qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, + pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); streamTaskCheckDownstream(pTask); return 0; @@ -190,7 +195,7 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); - SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); return 0; @@ -355,12 +360,13 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); streamMetaReleaseTask(pSnode->pMeta, pTask); - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + const char *pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", - pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; - qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", + qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp status %d", taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } @@ -388,15 +394,15 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } -int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); +int32_t sndProcessStreamTaskCheckRsp(SSnode *pSnode, SRpcMsg *pMsg) { + char *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code; SStreamTaskCheckRsp rsp; SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)pReq, len); + tDecoderInit(&decoder, (uint8_t *)pReq, len); code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); if (code < 0) { @@ -405,13 +411,13 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { } tDecoderClear(&decoder); - qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", - rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); + qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, + rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, - pSnode->pMeta->vgId); + pSnode->pMeta->vgId); return -1; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2a76f178a7..99b3b9cca1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -221,14 +221,16 @@ int32_t streamMetaMayConvertBackendFormat(SStreamMeta* pMeta) { } void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) { + taosThreadMutexLock(&pMeta->backendMutex); void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key)); if (ppBackend != NULL && *ppBackend != NULL) { // add ref later + taosThreadMutexUnlock(&pMeta->backendMutex); return *ppBackend; } void* pBackend = streamStateOpenTaskBackend(pMeta->path, key); taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); - + taosThreadMutexLock(&pMeta->backendMutex); return pBackend; } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 44c7b4f2e0..f51181b0b4 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -106,43 +106,43 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz } SStreamTask* pStreamTask = pTask; - char statePath[1024]; - if (!specPath) { - sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId); - } else { - memset(statePath, 0, 1024); - tstrncpy(statePath, path, 1024); - } + // char statePath[1024]; + // if (!specPath) { + // sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId); + // } else { + // memset(statePath, 0, 1024); + // tstrncpy(statePath, path, 1024); + // } pState->taskId = pStreamTask->id.taskId; pState->streamId = pStreamTask->id.streamId; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); #ifdef USE_ROCKSDB - SStreamMeta* pMeta = pStreamTask->pMeta; - pState->streamBackendRid = pMeta->streamBackendRid; - // taosWLockLatch(&pMeta->lock); - taosThreadMutexLock(&pMeta->backendMutex); - void* uniqueId = - taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); - if (uniqueId == NULL) { - int code = streamStateOpenBackend(pMeta->streamBackend, pState); - if (code == -1) { - taosThreadMutexUnlock(&pMeta->backendMutex); - taosMemoryFree(pState); - return NULL; - } - taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1, - &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId)); - } else { - int64_t id = *(int64_t*)uniqueId; - pState->pTdbState->backendCfWrapperId = id; - pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id); - // already exist stream task for - qInfo("already exist stream-state for %s", pState->pTdbState->idstr); - // taosAcquireRef(streamBackendId, pState->streamBackendRid); - } - taosThreadMutexUnlock(&pMeta->backendMutex); + // SStreamMeta* pMeta = pStreamTask->pMeta; + // pState->streamBackendRid = pMeta->streamBackendRid; + // taosWLockLatch(&pMeta->lock); + // taosThreadMutexLock(&pMeta->backendMutex); + // void* uniqueId = + // taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); + // if (uniqueId == NULL) { + // int code = streamStateOpenBackend(pMeta->streamBackend, pState); + // if (code == -1) { + // taosThreadMutexUnlock(&pMeta->backendMutex); + // taosMemoryFree(pState); + // return NULL; + // } + // taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1, + // &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId)); + // } else { + // int64_t id = *(int64_t*)uniqueId; + // pState->pTdbState->backendCfWrapperId = id; + // pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id); + // // already exist stream task for + // qInfo("already exist stream-state for %s", pState->pTdbState->idstr); + // // taosAcquireRef(streamBackendId, pState->streamBackendRid); + // } + // taosThreadMutexUnlock(&pMeta->backendMutex); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; @@ -1125,7 +1125,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal void streamStateDestroy(SStreamState* pState, bool remove) { #ifdef USE_ROCKSDB streamFileStateDestroy(pState->pFileState); - streamStateDestroy_rocksdb(pState, remove); + // streamStateDestroy_rocksdb(pState, remove); tSimpleHashCleanup(pState->parNameMap); // do nothong #endif