From 7d237d4310f7d90811529a87cf99b304e20e8f79 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 26 Aug 2023 17:37:09 +0800 Subject: [PATCH] fix UAF --- include/libs/stream/tstream.h | 17 +++--- source/libs/stream/src/stream.c | 93 +++++++++++++++-------------- source/libs/stream/src/streamMeta.c | 45 ++++++++++---- 3 files changed, 91 insertions(+), 64 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bcfa168e6e..9b0673a9f5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -277,8 +277,8 @@ typedef struct SStreamStatus { int8_t keepTaskStatus; bool transferState; bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it - int8_t timerActive; // timer is active - int8_t pauseAllowed; // allowed task status to be set to be paused + int8_t timerActive; // timer is active + int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; typedef struct SHistDataRange { @@ -289,7 +289,7 @@ typedef struct SHistDataRange { typedef struct SSTaskBasicInfo { int32_t nodeId; // vgroup id or snode id SEpSet epSet; - SEpSet mnodeEpset; // mnode epset for send heartbeat + SEpSet mnodeEpset; // mnode epset for send heartbeat int32_t selfChildId; int32_t totalLevel; int8_t taskLevel; @@ -397,6 +397,7 @@ typedef struct SStreamMeta { SMetaHbInfo hbInfo; int32_t closedTask; int32_t chkptNotReadyTasks; + int64_t rid; int64_t chkpId; SArray* chkpSaved; @@ -558,7 +559,7 @@ typedef struct STaskStatusEntry { typedef struct SStreamHbMsg { int32_t vgId; int32_t numOfTasks; - SArray* pTaskStatus; // SArray + SArray* pTaskStatus; // SArray } SStreamHbMsg; int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); @@ -629,8 +630,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); -void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); -void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); @@ -727,8 +728,8 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); -int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, - SStreamTask* pTask, int8_t isSucceed); +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, + int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int8_t isSucceed); #ifdef __cplusplus diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 959cbab68e..afd51b1e8b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,9 +16,9 @@ #include "streamInt.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) -#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) +#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) SStreamGlobalEnv streamEnv; int32_t streamInit() { @@ -66,8 +66,8 @@ static void streamSchedByTimer(void* param, void* tmrId) { qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam); if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - streamMetaReleaseTask(NULL, pTask); qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr); + streamMetaReleaseTask(NULL, pTask); return; } @@ -141,7 +141,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { } static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { - *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); if (*pBuf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -240,46 +240,47 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return 0; } -//static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { -// int8_t status = 0; +// static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { +// int8_t status = 0; // -// SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); -// if (pBlock == NULL) { -// streamTaskInputFail(pTask); -// status = TASK_INPUT_STATUS__FAILED; -// qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, -// pTask->id.idStr); -// } else { -// if (pBlock->type == STREAM_INPUT__TRANS_STATE) { -// pTask->status.appendTranstateBlock = true; -// } +// SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); +// if (pBlock == NULL) { +// streamTaskInputFail(pTask); +// status = TASK_INPUT_STATUS__FAILED; +// qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, +// pTask->id.idStr); +// } else { +// if (pBlock->type == STREAM_INPUT__TRANS_STATE) { +// pTask->status.appendTranstateBlock = true; +// } // -// int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); -// // input queue is full, upstream is blocked now -// status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; -// } +// int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); +// // input queue is full, upstream is blocked now +// status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; +// } // -// return status; -//} +// return status; +// } -//static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { -// *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); -// if (*pBuf == NULL) { -// return TSDB_CODE_OUT_OF_MEMORY; -// } +// static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** +// pBuf) { +// *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); +// if (*pBuf == NULL) { +// return TSDB_CODE_OUT_OF_MEMORY; +// } // -// ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); -// SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); +// ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); +// SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); // -// pDispatchRsp->inputStatus = status; -// pDispatchRsp->streamId = htobe64(pReq->streamId); -// pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); -// pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); -// pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); -// pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); +// pDispatchRsp->inputStatus = status; +// pDispatchRsp->streamId = htobe64(pReq->streamId); +// pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); +// pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); +// pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); +// pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); // -// return TSDB_CODE_SUCCESS; -//} +// return TSDB_CODE_SUCCESS; +// } int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); @@ -300,7 +301,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S pReq->upstreamTaskId); status = TASK_INPUT_STATUS__BLOCKED; } else { - // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked + // Current task has received the checkpoint req from the upstream task, from which the message should all be + // blocked if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); @@ -380,7 +382,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { msgLen, ver, total, size + SIZE_IN_MB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) { + if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */ (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); @@ -393,10 +395,11 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { destroyStreamDataBlock((SStreamDataBlock*)pItem); return code; } - } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || + type == STREAM_INPUT__TRANS_STATE) { taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", - pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); + qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, + pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); @@ -441,7 +444,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { return; } - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); pInfo->dataAllowed = true; } @@ -456,7 +459,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); if (pInfo->taskId == taskId) { return pInfo; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5cc3310e11..fc722fa12c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -25,22 +25,28 @@ #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; -int32_t streamBackendId = 0; -int32_t streamBackendCfWrapperId = 0; + +int32_t streamBackendId = 0; +int32_t streamBackendCfWrapperId = 0; +int32_t streamMetaId = 0; static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); static void metaHbToMnode(void* param, void* tmrId); static void streamMetaClear(SStreamMeta* pMeta); +void streamMetaCloseImpl(void* arg); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); + + streamMetaId = taosOpenRef(64, streamMetaCloseImpl); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaCleanup() { taosCloseRef(streamBackendId); taosCloseRef(streamBackendCfWrapperId); + taosCloseRef(streamMetaId); } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { @@ -92,7 +98,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->stage = stage; // send heartbeat every 5sec. - pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer); + pMeta->rid = taosAddRef(streamMetaId, pMeta); + int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); + *pRid = pMeta->rid; + + pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); pMeta->hbInfo.tickCounter = 0; pMeta->hbInfo.stopFlag = 0; @@ -116,9 +126,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } } - // if (pMeta->streamBackend == NULL) { - // goto _err; - // } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); code = streamBackendLoadCheckpointInfo(pMeta); @@ -207,11 +214,15 @@ void streamMetaClear(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) { qDebug("start to close stream meta"); + taosRemoveRef(streamMetaId, pMeta->rid); +} + +void streamMetaCloseImpl(void* arg) { + SStreamMeta* pMeta = arg; + qDebug("start to do-close stream meta"); if (pMeta == NULL) { return; } - // TODO, add ref to resolve race: timer thread cannot stop - taosTmrStop(pMeta->hbInfo.hbTmr); streamMetaClear(pMeta); @@ -625,18 +636,26 @@ static bool readyToSendHb(SMetaHbInfo* pInfo) { } void metaHbToMnode(void* param, void* tmrId) { - SStreamMeta* pMeta = param; + int64_t rid = *(int64_t*)param; + SStreamHbMsg hbMsg = {0}; + SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); + if (pMeta == NULL) { + taosMemoryFree(param); + return; + } // need to stop, stop now if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) { pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP; qDebug("vgId:%d jump out of meta timer", pMeta->vgId); + taosReleaseRef(streamMetaId, rid); return; } if (!readyToSendHb(&pMeta->hbInfo)) { - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer, &pMeta->hbInfo.hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); + taosReleaseRef(streamMetaId, rid); return; } @@ -675,6 +694,7 @@ void metaHbToMnode(void* param, void* tmrId) { if (code < 0) { qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); return; } @@ -682,6 +702,7 @@ void metaHbToMnode(void* param, void* tmrId) { if (buf == NULL) { qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); return; } @@ -691,6 +712,7 @@ void metaHbToMnode(void* param, void* tmrId) { rpcFreeCont(buf); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); return; } tEncoderClear(&encoder); @@ -704,5 +726,6 @@ void metaHbToMnode(void* param, void* tmrId) { qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); tmsgSendReq(&epset, &msg); - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer, &pMeta->hbInfo.hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); + taosReleaseRef(streamMetaId, rid); } \ No newline at end of file