fix(stream): add expired epset node list for stream tasks in hb to mnode.
This commit is contained in:
parent
0c2079a963
commit
c7285c0ede
|
@ -236,6 +236,11 @@ typedef struct {
|
|||
SUseDbRsp dbInfo;
|
||||
} STaskDispatcherShuffle;
|
||||
|
||||
typedef struct {
|
||||
int32_t nodeId;
|
||||
SEpSet epset;
|
||||
} SDownstreamTaskEpset;
|
||||
|
||||
typedef struct {
|
||||
int64_t stbUid;
|
||||
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
|
@ -327,15 +332,10 @@ typedef struct SDispatchMsgInfo {
|
|||
void* pTimer; // used to dispatch data after a given time duration
|
||||
} SDispatchMsgInfo;
|
||||
|
||||
typedef struct STaskOutputQueue {
|
||||
typedef struct STaskQueue {
|
||||
int8_t status;
|
||||
SStreamQueue* queue;
|
||||
} STaskOutputQueue;
|
||||
|
||||
typedef struct STaskInputInfo {
|
||||
int8_t status;
|
||||
SStreamQueue* queue;
|
||||
} STaskInputInfo;
|
||||
} STaskQueue;
|
||||
|
||||
typedef struct STaskSchedInfo {
|
||||
int8_t status;
|
||||
|
@ -384,6 +384,7 @@ typedef struct STaskOutputInfo {
|
|||
};
|
||||
int8_t type;
|
||||
STokenBucket* pTokenBucket;
|
||||
SArray* pDownstreamUpdateList;
|
||||
} STaskOutputInfo;
|
||||
|
||||
typedef struct SUpstreamInfo {
|
||||
|
@ -395,8 +396,8 @@ struct SStreamTask {
|
|||
int64_t ver;
|
||||
SStreamTaskId id;
|
||||
SSTaskBasicInfo info;
|
||||
STaskOutputQueue outputq;
|
||||
STaskInputInfo inputInfo;
|
||||
STaskQueue outputq;
|
||||
STaskQueue inputq;
|
||||
STaskSchedInfo schedInfo;
|
||||
STaskOutputInfo outputInfo;
|
||||
SDispatchMsgInfo msgInfo;
|
||||
|
@ -645,7 +646,8 @@ typedef struct STaskStatusEntry {
|
|||
typedef struct SStreamHbMsg {
|
||||
int32_t vgId;
|
||||
int32_t numOfTasks;
|
||||
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
|
||||
SArray* pTaskStatus; // SArray<STaskStatusEntry>
|
||||
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
|
||||
} SStreamHbMsg;
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
||||
|
|
|
@ -91,6 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
|
|||
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
|
||||
static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList);
|
||||
|
||||
int32_t mndInitStream(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -2119,7 +2120,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
continue;
|
||||
}
|
||||
|
||||
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name);
|
||||
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
||||
pStream->name, pTrans->id);
|
||||
|
||||
int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans);
|
||||
|
||||
// todo: not continue, drop all and retry again
|
||||
|
@ -2216,23 +2219,26 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) {
|
||||
static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
|
||||
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (p != NULL) {
|
||||
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
|
||||
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
|
||||
|
||||
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
|
||||
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
|
||||
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
|
||||
taosArrayRemove(pExecNode->pTaskList, k);
|
||||
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t) pRemovedId->taskId,
|
||||
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
|
||||
break;
|
||||
}
|
||||
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
|
||||
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
|
||||
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
|
||||
taosArrayRemove(pExecNode->pTaskList, k);
|
||||
|
||||
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
|
||||
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool taskNodeExists(SArray* pList, int32_t nodeId) {
|
||||
|
@ -2322,6 +2328,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
removeExpirednodeEntryAndTask(pNodeSnapshot);
|
||||
|
||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
||||
|
@ -2359,10 +2366,6 @@ typedef struct SMStreamNodeCheckMsg {
|
|||
int8_t placeHolder; // // to fix windows compile error, define place holder
|
||||
} SMStreamNodeCheckMsg;
|
||||
|
||||
typedef struct SMStreamTaskResetMsg {
|
||||
int8_t placeHolder;
|
||||
} SMStreamTaskResetMsg;
|
||||
|
||||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
@ -2577,6 +2580,43 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
|
||||
int32_t num = taosArrayGetSize(pNodeList);
|
||||
|
||||
for (int k = 0; k < num; ++k) {
|
||||
int32_t* pVgId = taosArrayGet(pNodeList, k);
|
||||
|
||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
|
||||
for (int i = 0; i < numOfNodes; ++i) {
|
||||
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
|
||||
|
||||
if (pNodeEntry->nodeId == *pVgId) {
|
||||
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
|
||||
pNodeEntry->stageUpdated = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
|
||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
|
||||
for(int32_t j = 0; j < numOfNodes; ++j) {
|
||||
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
|
||||
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
||||
|
||||
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
||||
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
||||
|
||||
pNodeEntry->stageUpdated = true;
|
||||
pTaskEntry->stage = stage;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SStreamHbMsg req = {0};
|
||||
|
@ -2602,29 +2642,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
doExtractTasksFromStream(pMnode);
|
||||
}
|
||||
|
||||
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||
|
||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
||||
if (pEntry == NULL) {
|
||||
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
||||
if (pTaskEntry == NULL) {
|
||||
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->stage != pEntry->stage && pEntry->stage != -1) {
|
||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
|
||||
for(int32_t j = 0; j < numOfNodes; ++j) {
|
||||
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
|
||||
if (pNodeEntry->nodeId == pEntry->nodeId) {
|
||||
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64,
|
||||
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);
|
||||
|
||||
pNodeEntry->stageUpdated = true;
|
||||
pEntry->stage = p->stage;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
|
||||
updateStageInfo(pTaskEntry, p->stage);
|
||||
} else {
|
||||
streamTaskStatusCopy(pEntry, p);
|
||||
streamTaskStatusCopy(pTaskEntry, p);
|
||||
if (p->activeCheckpointId != 0) {
|
||||
if (activeCheckpointId != 0) {
|
||||
ASSERT(activeCheckpointId == p->activeCheckpointId);
|
||||
|
@ -2638,7 +2669,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
pEntry->status = p->status;
|
||||
pTaskEntry->status = p->status;
|
||||
if (p->status != TASK_STATUS__READY) {
|
||||
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
|
||||
}
|
||||
|
@ -2655,5 +2686,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
||||
taosArrayDestroy(req.pTaskStatus);
|
||||
taosArrayDestroy(req.pUpdateNodes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -1553,7 +1553,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
|
||||
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
|
||||
streamStartScanHistoryAsync(pTask, igUntreated);
|
||||
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) {
|
||||
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
|
||||
tqScanWalAsync(pTq, false);
|
||||
} else {
|
||||
streamSchedExec(pTask);
|
||||
|
|
|
@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
|
|||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||
taosRUnLockLatch(&pTq->pStreamMeta->lock);
|
||||
|
||||
tqDebug("handle submit, restore:%d, numOfTasks:%d", pTq->pVnode->restored, numOfTasks);
|
||||
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
|
||||
|
||||
// push data for stream processing:
|
||||
// 1. the vnode has already been restored.
|
||||
|
|
|
@ -344,13 +344,13 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// check if input queue is full or not
|
||||
if (streamQueueIsFull(pTask->inputInfo.queue)) {
|
||||
if (streamQueueIsFull(pTask->inputq.queue)) {
|
||||
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
||||
return false;
|
||||
}
|
||||
|
||||
// the input queue of downstream task is full, so the output is blocked, stopped for a while
|
||||
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
|
||||
return false;
|
||||
}
|
||||
|
@ -444,7 +444,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue);
|
||||
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
||||
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
|
|
@ -296,7 +296,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
|
|||
return 0;
|
||||
}
|
||||
|
||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); }
|
||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
|
||||
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
|
|
|
@ -1043,8 +1043,8 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
|||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
||||
|
||||
// put data into inputQ of current task is also allowed
|
||||
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
||||
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
|
||||
pTask->id.idStr, downstreamId, el);
|
||||
} else {
|
||||
|
@ -1096,7 +1096,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
|
||||
} else { // code == 0
|
||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED;
|
||||
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
|
||||
// block the input of current task, to push pressure to upstream
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
||||
|
|
|
@ -106,7 +106,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr);
|
||||
taosMsleep(1000);
|
||||
continue;
|
||||
|
@ -217,7 +217,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
|
||||
taosMsleep(10000);
|
||||
continue;
|
||||
|
@ -374,7 +374,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
|
||||
// 7. pause allowed.
|
||||
streamTaskEnablePause(pStreamTask);
|
||||
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) {
|
||||
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
|
||||
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||
|
||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||
|
@ -630,7 +630,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(pTask) ||
|
||||
if ((streamQueueGetNumOfItems(pTask->inputq.queue) == 0) || streamTaskShouldStop(pTask) ||
|
||||
streamTaskShouldPause(pTask)) {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
|
|
@ -795,6 +795,15 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
|||
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
|
||||
}
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
|
||||
if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1;
|
||||
|
||||
for (int j = 0; j < numOfVgs; ++j) {
|
||||
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
|
||||
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
@ -828,6 +837,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
|||
taosArrayPush(pReq->pTaskStatus, &entry);
|
||||
}
|
||||
|
||||
int32_t numOfVgs = 0;
|
||||
if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1;
|
||||
|
||||
pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t));
|
||||
|
||||
for (int j = 0; j < numOfVgs; ++j) {
|
||||
int32_t vgId = 0;
|
||||
if (tDecodeI32(pDecoder, &vgId) < 0) return -1;
|
||||
taosArrayPush(pReq->pUpdateNodes, &vgId);
|
||||
}
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -882,13 +902,14 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasValEpset = false;
|
||||
bool hasMnodeEpset = false;
|
||||
|
||||
hbMsg.vgId = pMeta->vgId;
|
||||
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
||||
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||
|
||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||
|
||||
// not report the status of fill-history task
|
||||
|
@ -901,7 +922,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
.status = streamTaskGetStatus(*pTask, NULL),
|
||||
.nodeId = pMeta->vgId,
|
||||
.stage = pMeta->stage,
|
||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
|
||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
||||
};
|
||||
|
||||
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
|
||||
|
@ -920,18 +941,39 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
|
||||
}
|
||||
|
||||
taosArrayPush(hbMsg.pTaskStatus, &entry);
|
||||
taosThreadMutexLock(&(*pTask)->lock);
|
||||
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
|
||||
for (int j = 0; j < num; ++j) {
|
||||
int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
|
||||
|
||||
if (!hasValEpset) {
|
||||
bool exist = false;
|
||||
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
|
||||
for (int k = 0; k < numOfExisted; ++k) {
|
||||
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
|
||||
exist = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!exist) {
|
||||
taosArrayPush(hbMsg.pUpdateNodes, pNodeId);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList);
|
||||
taosThreadMutexUnlock(&(*pTask)->lock);
|
||||
|
||||
taosArrayPush(hbMsg.pTaskStatus, &entry);
|
||||
if (!hasMnodeEpset) {
|
||||
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
|
||||
hasValEpset = true;
|
||||
hasMnodeEpset = true;
|
||||
}
|
||||
}
|
||||
|
||||
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
|
||||
taosRUnLockLatch(&pMeta->lock);
|
||||
|
||||
if (hasValEpset) {
|
||||
if (hasMnodeEpset) {
|
||||
int32_t code = 0;
|
||||
int32_t tlen = 0;
|
||||
|
||||
|
@ -976,6 +1018,8 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosArrayDestroy(hbMsg.pUpdateNodes);
|
||||
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
}
|
||||
|
|
|
@ -169,7 +169,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue);
|
||||
if (qItem == NULL) {
|
||||
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
|
||||
taosMsleep(WAIT_FOR_DURATION);
|
||||
|
@ -211,7 +211,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
|
||||
streamQueueProcessFail(pTask->inputInfo.queue);
|
||||
streamQueueProcessFail(pTask->inputq.queue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
|
@ -232,7 +232,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
|
||||
streamQueueProcessFail(pTask->inputInfo.queue);
|
||||
streamQueueProcessFail(pTask->inputq.queue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -240,7 +240,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
}
|
||||
|
||||
*numOfBlocks += 1;
|
||||
streamQueueProcessSuccess(pTask->inputInfo.queue);
|
||||
streamQueueProcessSuccess(pTask->inputq.queue);
|
||||
|
||||
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
|
||||
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
||||
|
@ -258,12 +258,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
|
||||
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||
int8_t type = pItem->type;
|
||||
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1;
|
||||
STaosQueue* pQueue = pTask->inputq.queue->pQueue;
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
|
||||
|
||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
||||
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue)) {
|
||||
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||
stTrace(
|
||||
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||
|
@ -290,7 +290,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
if (streamQueueIsFull(pTask->inputInfo.queue)) {
|
||||
if (streamQueueIsFull(pTask->inputq.queue)) {
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||
|
||||
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
|
|
|
@ -346,6 +346,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
|||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
stDebug("s-task:%s should stop, do not do check downstream again", id);
|
||||
|
@ -354,8 +355,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
|
||||
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
bool found = false;
|
||||
|
||||
bool found = false;
|
||||
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
|
||||
for (int32_t i = 0; i < numOfReqs; i++) {
|
||||
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
|
||||
|
@ -402,6 +403,26 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
|
||||
"downstream again, nodeUpdate needed",
|
||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
||||
bool existed = false;
|
||||
for (int i = 0; i < num; ++i) {
|
||||
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
|
||||
if (p->nodeId == pRsp->downstreamNodeId) {
|
||||
existed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!existed) {
|
||||
SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId};
|
||||
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
|
||||
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId,
|
||||
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
|
|||
pTask->id.idStr = taosStrdup(buf);
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
|
||||
if (fillHistory) {
|
||||
|
@ -337,8 +337,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
||||
if (pTask->inputInfo.queue) {
|
||||
streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId);
|
||||
if (pTask->inputq.queue) {
|
||||
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
|
||||
}
|
||||
|
||||
if (pTask->outputq.queue) {
|
||||
|
@ -399,8 +399,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
|
||||
taosMemoryFree(pTask->outputInfo.pTokenBucket);
|
||||
taosThreadMutexDestroy(&pTask->lock);
|
||||
taosMemoryFree(pTask);
|
||||
|
||||
taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
|
||||
pTask->outputInfo.pDownstreamUpdateList = NULL;
|
||||
|
||||
taosMemoryFree(pTask);
|
||||
stDebug("s-task:0x%x free task completed", taskId);
|
||||
}
|
||||
|
||||
|
@ -409,10 +412,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
pTask->refCnt = 1;
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->status.timerActive = 0;
|
||||
pTask->inputInfo.queue = streamQueueOpen(512 << 10);
|
||||
pTask->inputq.queue = streamQueueOpen(512 << 10);
|
||||
pTask->outputq.queue = streamQueueOpen(512 << 10);
|
||||
|
||||
if (pTask->inputInfo.queue == NULL || pTask->outputq.queue == NULL) {
|
||||
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
|
||||
stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -425,7 +428,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
}
|
||||
|
||||
pTask->execInfo.created = taosGetTimestampMs();
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->pMeta = pMeta;
|
||||
|
||||
|
@ -462,6 +465,11 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
taosThreadMutexInit(&pTask->lock, &attr);
|
||||
streamTaskOpenAllUpstreamInput(pTask);
|
||||
|
||||
pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
||||
if (pTask->outputInfo.pDownstreamUpdateList == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -582,6 +582,7 @@ sql create table t1 using st tags(1,1,1);
|
|||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,2,1.1);
|
||||
|
|
Loading…
Reference in New Issue