From 483f9ab8966aac139c8689820ff83dbb6ad8d9bf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 6 Jul 2023 03:39:28 +0000 Subject: [PATCH] add other --- source/dnode/mnode/impl/src/mndStream.c | 8 ++++---- source/libs/stream/src/streamBackendRocksdb.c | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b1afb6ae4d..428bd12ee8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1016,7 +1016,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre void *buf; int32_t tlen; - if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->nodeId, checkpointId) < 0) { + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId) < 0) { mndReleaseVgroup(pMnode, pVgObj); taosRUnLockLatch(&pStream->lock); mndTransDrop(pTrans); @@ -1364,7 +1364,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // task id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - char idstr[128] = {0}; + char idstr[128] = {0}; int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); idstr[2] = '0'; idstr[3] = 'x'; @@ -1404,7 +1404,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock colDataSetVal(pColInfo, numOfRows, (const char *)&level, false); // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; + char status[20 + VARSTR_HEADER_SIZE] = {0}; int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus); if (taskStatus == TASK_STATUS__NORMAL) { memcpy(varDataVal(status), "normal", 6); @@ -1470,7 +1470,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) { +int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) { int32_t size = taosArrayGetSize(tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(tasks, i); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 41b052ec33..3e3804d370 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -401,12 +401,12 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { return 0; } int32_t streamBackendDoCheckpoint(void* arg, const char* path) { - SStreamMeta* pMeta = arg; - int64_t backendRid = pMeta->streamBackendRid; - int64_t checkpointId = pMeta->checkpointTs; - int64_t st = taosGetTimestampMs(); - int32_t code = -1; - SBackendHandle* pHandle = taosAcquireRef(streamBackendId, backendRid); + SStreamMeta* pMeta = arg; + int64_t backendRid = pMeta->streamBackendRid; + int64_t checkpointId = pMeta->checkpointTs; + int64_t st = taosGetTimestampMs(); + int32_t code = -1; + SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); char checkpointDir[256] = {0}; sprintf(checkpointDir, "%s/checkpoint_%" PRId64 "", path, checkpointId);