fix UAF
This commit is contained in:
parent
bed5765013
commit
7d237d4310
|
@ -277,8 +277,8 @@ typedef struct SStreamStatus {
|
|||
int8_t keepTaskStatus;
|
||||
bool transferState;
|
||||
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
|
||||
int8_t timerActive; // timer is active
|
||||
int8_t pauseAllowed; // allowed task status to be set to be paused
|
||||
int8_t timerActive; // timer is active
|
||||
int8_t pauseAllowed; // allowed task status to be set to be paused
|
||||
} SStreamStatus;
|
||||
|
||||
typedef struct SHistDataRange {
|
||||
|
@ -289,7 +289,7 @@ typedef struct SHistDataRange {
|
|||
typedef struct SSTaskBasicInfo {
|
||||
int32_t nodeId; // vgroup id or snode id
|
||||
SEpSet epSet;
|
||||
SEpSet mnodeEpset; // mnode epset for send heartbeat
|
||||
SEpSet mnodeEpset; // mnode epset for send heartbeat
|
||||
int32_t selfChildId;
|
||||
int32_t totalLevel;
|
||||
int8_t taskLevel;
|
||||
|
@ -397,6 +397,7 @@ typedef struct SStreamMeta {
|
|||
SMetaHbInfo hbInfo;
|
||||
int32_t closedTask;
|
||||
int32_t chkptNotReadyTasks;
|
||||
int64_t rid;
|
||||
|
||||
int64_t chkpId;
|
||||
SArray* chkpSaved;
|
||||
|
@ -558,7 +559,7 @@ typedef struct STaskStatusEntry {
|
|||
typedef struct SStreamHbMsg {
|
||||
int32_t vgId;
|
||||
int32_t numOfTasks;
|
||||
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
|
||||
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
|
||||
} SStreamHbMsg;
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
||||
|
@ -629,8 +630,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
|||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||
|
@ -727,8 +728,8 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
|
|||
int32_t streamTaskReloadState(SStreamTask* pTask);
|
||||
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
|
||||
SStreamTask* pTask, int8_t isSucceed);
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||
int8_t isSucceed);
|
||||
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
|
||||
int8_t isSucceed);
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -16,9 +16,9 @@
|
|||
#include "streamInt.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
|
||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
||||
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
|
||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
|
||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
||||
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
|
||||
SStreamGlobalEnv streamEnv;
|
||||
|
||||
int32_t streamInit() {
|
||||
|
@ -66,8 +66,8 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
|||
qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam);
|
||||
|
||||
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
||||
streamMetaReleaseTask(NULL, pTask);
|
||||
qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr);
|
||||
streamMetaReleaseTask(NULL, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -141,7 +141,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
|
||||
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||
if (*pBuf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -240,46 +240,47 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
return 0;
|
||||
}
|
||||
|
||||
//static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
|
||||
// int8_t status = 0;
|
||||
// static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
|
||||
// int8_t status = 0;
|
||||
//
|
||||
// SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
|
||||
// if (pBlock == NULL) {
|
||||
// streamTaskInputFail(pTask);
|
||||
// status = TASK_INPUT_STATUS__FAILED;
|
||||
// qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
// pTask->id.idStr);
|
||||
// } else {
|
||||
// if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
|
||||
// pTask->status.appendTranstateBlock = true;
|
||||
// }
|
||||
// SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
|
||||
// if (pBlock == NULL) {
|
||||
// streamTaskInputFail(pTask);
|
||||
// status = TASK_INPUT_STATUS__FAILED;
|
||||
// qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
// pTask->id.idStr);
|
||||
// } else {
|
||||
// if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
|
||||
// pTask->status.appendTranstateBlock = true;
|
||||
// }
|
||||
//
|
||||
// int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
||||
// // input queue is full, upstream is blocked now
|
||||
// status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
|
||||
// }
|
||||
// int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
||||
// // input queue is full, upstream is blocked now
|
||||
// status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
|
||||
// }
|
||||
//
|
||||
// return status;
|
||||
//}
|
||||
// return status;
|
||||
// }
|
||||
|
||||
//static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
|
||||
// *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||
// if (*pBuf == NULL) {
|
||||
// return TSDB_CODE_OUT_OF_MEMORY;
|
||||
// }
|
||||
// static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void**
|
||||
// pBuf) {
|
||||
// *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||
// if (*pBuf == NULL) {
|
||||
// return TSDB_CODE_OUT_OF_MEMORY;
|
||||
// }
|
||||
//
|
||||
// ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
|
||||
// SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
|
||||
// ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
|
||||
// SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
|
||||
//
|
||||
// pDispatchRsp->inputStatus = status;
|
||||
// pDispatchRsp->streamId = htobe64(pReq->streamId);
|
||||
// pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
|
||||
// pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
|
||||
// pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
|
||||
// pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
|
||||
// pDispatchRsp->inputStatus = status;
|
||||
// pDispatchRsp->streamId = htobe64(pReq->streamId);
|
||||
// pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
|
||||
// pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
|
||||
// pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
|
||||
// pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
|
||||
//
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
||||
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
||||
|
@ -300,7 +301,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
pReq->upstreamTaskId);
|
||||
status = TASK_INPUT_STATUS__BLOCKED;
|
||||
} else {
|
||||
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
|
||||
// Current task has received the checkpoint req from the upstream task, from which the message should all be
|
||||
// blocked
|
||||
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||
qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
|
||||
|
@ -380,7 +382,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
|
||||
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */ (tInputQueueIsFull(pTask))) {
|
||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||
|
@ -393,10 +395,11 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||
return code;
|
||||
}
|
||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) {
|
||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||
type == STREAM_INPUT__TRANS_STATE) {
|
||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB",
|
||||
pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
||||
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
||||
} else if (type == STREAM_INPUT__GET_RES) {
|
||||
// use the default memory limit, refactor later.
|
||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||
|
@ -441,7 +444,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
|||
return;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||
pInfo->dataAllowed = true;
|
||||
}
|
||||
|
@ -456,7 +459,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
|
|||
|
||||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
|
||||
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||
if (pInfo->taskId == taskId) {
|
||||
return pInfo;
|
||||
|
|
|
@ -25,22 +25,28 @@
|
|||
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
||||
|
||||
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
||||
int32_t streamBackendId = 0;
|
||||
int32_t streamBackendCfWrapperId = 0;
|
||||
|
||||
int32_t streamBackendId = 0;
|
||||
int32_t streamBackendCfWrapperId = 0;
|
||||
int32_t streamMetaId = 0;
|
||||
|
||||
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||
static void metaHbToMnode(void* param, void* tmrId);
|
||||
static void streamMetaClear(SStreamMeta* pMeta);
|
||||
|
||||
void streamMetaCloseImpl(void* arg);
|
||||
static void streamMetaEnvInit() {
|
||||
streamBackendId = taosOpenRef(64, streamBackendCleanup);
|
||||
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
|
||||
|
||||
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
|
||||
}
|
||||
|
||||
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
|
||||
void streamMetaCleanup() {
|
||||
taosCloseRef(streamBackendId);
|
||||
taosCloseRef(streamBackendCfWrapperId);
|
||||
taosCloseRef(streamMetaId);
|
||||
}
|
||||
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
|
||||
|
@ -92,7 +98,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
pMeta->stage = stage;
|
||||
|
||||
// send heartbeat every 5sec.
|
||||
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer);
|
||||
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
||||
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRid = pMeta->rid;
|
||||
|
||||
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
|
||||
pMeta->hbInfo.tickCounter = 0;
|
||||
pMeta->hbInfo.stopFlag = 0;
|
||||
|
||||
|
@ -116,9 +126,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
}
|
||||
}
|
||||
|
||||
// if (pMeta->streamBackend == NULL) {
|
||||
// goto _err;
|
||||
// }
|
||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||
|
||||
code = streamBackendLoadCheckpointInfo(pMeta);
|
||||
|
@ -207,11 +214,15 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|||
|
||||
void streamMetaClose(SStreamMeta* pMeta) {
|
||||
qDebug("start to close stream meta");
|
||||
taosRemoveRef(streamMetaId, pMeta->rid);
|
||||
}
|
||||
|
||||
void streamMetaCloseImpl(void* arg) {
|
||||
SStreamMeta* pMeta = arg;
|
||||
qDebug("start to do-close stream meta");
|
||||
if (pMeta == NULL) {
|
||||
return;
|
||||
}
|
||||
// TODO, add ref to resolve race: timer thread cannot stop
|
||||
taosTmrStop(pMeta->hbInfo.hbTmr);
|
||||
|
||||
streamMetaClear(pMeta);
|
||||
|
||||
|
@ -625,18 +636,26 @@ static bool readyToSendHb(SMetaHbInfo* pInfo) {
|
|||
}
|
||||
|
||||
void metaHbToMnode(void* param, void* tmrId) {
|
||||
SStreamMeta* pMeta = param;
|
||||
int64_t rid = *(int64_t*)param;
|
||||
|
||||
SStreamHbMsg hbMsg = {0};
|
||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
|
||||
if (pMeta == NULL) {
|
||||
taosMemoryFree(param);
|
||||
return;
|
||||
}
|
||||
|
||||
// need to stop, stop now
|
||||
if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) {
|
||||
pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP;
|
||||
qDebug("vgId:%d jump out of meta timer", pMeta->vgId);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!readyToSendHb(&pMeta->hbInfo)) {
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -675,6 +694,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
if (code < 0) {
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -682,6 +702,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
if (buf == NULL) {
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -691,6 +712,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
rpcFreeCont(buf);
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
@ -704,5 +726,6 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
|
||||
|
||||
tmsgSendReq(&epset, &msg);
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
}
|
Loading…
Reference in New Issue