fix(stream): add expired epset node list for stream tasks in hb to mnode.

This commit is contained in:
Haojun Liao 2023-10-27 23:05:41 +08:00
parent 62cc33715c
commit 4c267b538f
13 changed files with 189 additions and 81 deletions

View File

@ -236,6 +236,11 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;
typedef struct {
int32_t nodeId;
SEpSet epset;
} SDownstreamTaskEpset;
typedef struct {
int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN];
@ -327,15 +332,10 @@ typedef struct SDispatchMsgInfo {
void* pTimer; // used to dispatch data after a given time duration
} SDispatchMsgInfo;
typedef struct STaskOutputQueue {
typedef struct STaskQueue {
int8_t status;
SStreamQueue* queue;
} STaskOutputQueue;
typedef struct STaskInputInfo {
int8_t status;
SStreamQueue* queue;
} STaskInputInfo;
} STaskQueue;
typedef struct STaskSchedInfo {
int8_t status;
@ -384,6 +384,7 @@ typedef struct STaskOutputInfo {
};
int8_t type;
STokenBucket* pTokenBucket;
SArray* pDownstreamUpdateList;
} STaskOutputInfo;
typedef struct SUpstreamInfo {
@ -395,8 +396,8 @@ struct SStreamTask {
int64_t ver;
SStreamTaskId id;
SSTaskBasicInfo info;
STaskOutputQueue outputq;
STaskInputInfo inputInfo;
STaskQueue outputq;
STaskQueue inputq;
STaskSchedInfo schedInfo;
STaskOutputInfo outputInfo;
SDispatchMsgInfo msgInfo;
@ -645,7 +646,8 @@ typedef struct STaskStatusEntry {
typedef struct SStreamHbMsg {
int32_t vgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);

View File

@ -91,6 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -2119,7 +2120,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
continue;
}
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name);
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id);
int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again
@ -2216,23 +2219,26 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
}
}
static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) {
static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t) pRemovedId->taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
break;
}
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
break;
}
}
return 0;
return TSDB_CODE_SUCCESS;
}
static bool taskNodeExists(SArray* pList, int32_t nodeId) {
@ -2322,6 +2328,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
taosThreadMutexLock(&execInfo.lock);
removeExpirednodeEntryAndTask(pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
@ -2359,10 +2366,6 @@ typedef struct SMStreamNodeCheckMsg {
int8_t placeHolder; // // to fix windows compile error, define place holder
} SMStreamNodeCheckMsg;
typedef struct SMStreamTaskResetMsg {
int8_t placeHolder;
} SMStreamTaskResetMsg;
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -2577,6 +2580,43 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
return 0;
}
int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
for (int k = 0; k < num; ++k) {
int32_t* pVgId = taosArrayGet(pNodeList, k);
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
pNodeEntry->stageUpdated = true;
break;
}
}
}
return TSDB_CODE_SUCCESS;
}
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pTaskEntry->stage = stage;
break;
}
}
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
@ -2602,29 +2642,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
doExtractTasksFromStream(pMnode);
}
setNodeEpsetExpiredFlag(req.pUpdateNodes);
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pEntry == NULL) {
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
continue;
}
if (p->stage != pEntry->stage && pEntry->stage != -1) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pEntry->nodeId) {
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64,
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pEntry->stage = p->stage;
break;
}
}
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
} else {
streamTaskStatusCopy(pEntry, p);
streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) {
if (activeCheckpointId != 0) {
ASSERT(activeCheckpointId == p->activeCheckpointId);
@ -2638,7 +2669,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
pEntry->status = p->status;
pTaskEntry->status = p->status;
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
@ -2655,5 +2686,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock);
taosArrayDestroy(req.pTaskStatus);
taosArrayDestroy(req.pUpdateNodes);
return TSDB_CODE_SUCCESS;
}

View File

@ -1550,7 +1550,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) {
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
tqScanWalAsync(pTq, false);
} else {
streamSchedExec(pTask);

View File

@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosRUnLockLatch(&pTq->pStreamMeta->lock);
tqDebug("handle submit, restore:%d, numOfTasks:%d", pTq->pVnode->restored, numOfTasks);
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
// push data for stream processing:
// 1. the vnode has already been restored.

View File

@ -344,13 +344,13 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
}
// check if input queue is full or not
if (streamQueueIsFull(pTask->inputInfo.queue)) {
if (streamQueueIsFull(pTask->inputq.queue)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
return false;
}
// the input queue of downstream task is full, so the output is blocked, stopped for a while
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
return false;
}
@ -444,7 +444,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock);

View File

@ -296,7 +296,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
return 0;
}
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); }
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);

View File

@ -1043,8 +1043,8 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
// put data into inputQ of current task is also allowed
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
} else {
@ -1096,7 +1096,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} else { // code == 0
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED;
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
// block the input of current task, to push pressure to upstream
taosThreadMutexLock(&pTask->lock);
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);

View File

@ -106,7 +106,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return 0;
}
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr);
taosMsleep(1000);
continue;
@ -217,7 +217,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
return 0;
}
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
taosMsleep(10000);
continue;
@ -374,7 +374,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 7. pause allowed.
streamTaskEnablePause(pStreamTask);
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) {
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
@ -630,7 +630,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
}
taosThreadMutexLock(&pTask->lock);
if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(pTask) ||
if ((streamQueueGetNumOfItems(pTask->inputq.queue) == 0) || streamTaskShouldStop(pTask) ||
streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock);

View File

@ -799,6 +799,15 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
}
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1;
for (int j = 0; j < numOfVgs; ++j) {
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -832,6 +841,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
taosArrayPush(pReq->pTaskStatus, &entry);
}
int32_t numOfVgs = 0;
if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1;
pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t));
for (int j = 0; j < numOfVgs; ++j) {
int32_t vgId = 0;
if (tDecodeI32(pDecoder, &vgId) < 0) return -1;
taosArrayPush(pReq->pUpdateNodes, &vgId);
}
tEndDecode(pDecoder);
return 0;
}
@ -886,13 +906,14 @@ void metaHbToMnode(void* param, void* tmrId) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0};
bool hasValEpset = false;
bool hasMnodeEpset = false;
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
// not report the status of fill-history task
@ -905,7 +926,7 @@ void metaHbToMnode(void* param, void* tmrId) {
.status = streamTaskGetStatus(*pTask, NULL),
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
};
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
@ -924,18 +945,39 @@ void metaHbToMnode(void* param, void* tmrId) {
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
}
taosArrayPush(hbMsg.pTaskStatus, &entry);
taosThreadMutexLock(&(*pTask)->lock);
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
for (int j = 0; j < num; ++j) {
int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
if (!hasValEpset) {
bool exist = false;
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
exist = true;
break;
}
}
if (!exist) {
taosArrayPush(hbMsg.pUpdateNodes, pNodeId);
}
}
taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList);
taosThreadMutexUnlock(&(*pTask)->lock);
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasValEpset = true;
hasMnodeEpset = true;
}
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
taosRUnLockLatch(&pMeta->lock);
if (hasValEpset) {
if (hasMnodeEpset) {
int32_t code = 0;
int32_t tlen = 0;
@ -980,6 +1022,8 @@ void metaHbToMnode(void* param, void* tmrId) {
}
taosArrayDestroy(hbMsg.pTaskStatus);
taosArrayDestroy(hbMsg.pUpdateNodes);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}

View File

@ -169,7 +169,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue);
if (qItem == NULL) {
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(WAIT_FOR_DURATION);
@ -211,7 +211,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue);
streamQueueProcessFail(pTask->inputq.queue);
return TSDB_CODE_SUCCESS;
}
} else {
@ -232,7 +232,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue);
streamQueueProcessFail(pTask->inputq.queue);
return TSDB_CODE_SUCCESS;
}
@ -240,7 +240,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputInfo.queue);
streamQueueProcessSuccess(pTask->inputq.queue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
@ -258,12 +258,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1;
STaosQueue* pQueue = pTask->inputq.queue->pQueue;
int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue)) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
@ -290,7 +290,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pTask->inputInfo.queue)) {
if (streamQueueIsFull(pTask->inputq.queue)) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",

View File

@ -346,6 +346,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, do not do check downstream again", id);
@ -354,8 +355,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (pRsp->status == TASK_DOWNSTREAM_READY) {
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
bool found = false;
bool found = false;
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
for (int32_t i = 0; i < numOfReqs; i++) {
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
@ -402,6 +403,26 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == pRsp->downstreamNodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
return 0;
}

View File

@ -59,7 +59,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
if (fillHistory) {
@ -337,8 +337,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputInfo.queue) {
streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId);
if (pTask->inputq.queue) {
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
}
if (pTask->outputq.queue) {
@ -399,8 +399,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask);
taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
pTask->outputInfo.pDownstreamUpdateList = NULL;
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
}
@ -409,10 +412,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
pTask->inputInfo.queue = streamQueueOpen(512 << 10);
pTask->inputq.queue = streamQueueOpen(512 << 10);
pTask->outputq.queue = streamQueueOpen(512 << 10);
if (pTask->inputInfo.queue == NULL || pTask->outputq.queue == NULL) {
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -425,7 +428,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
pTask->execInfo.created = taosGetTimestampMs();
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
@ -462,6 +465,11 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
taosThreadMutexInit(&pTask->lock, &attr);
streamTaskOpenAllUpstreamInput(pTask);
pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pTask->outputInfo.pDownstreamUpdateList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -582,6 +582,7 @@ sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);