fix(stream): fix error.

This commit is contained in:
Haojun Liao 2023-08-31 20:50:12 +08:00
parent bef0d4bff3
commit 95da66e3e8
3 changed files with 52 additions and 52 deletions

View File

@ -868,6 +868,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
taosThreadMutexLock(&execNodeList.lock); taosThreadMutexLock(&execNodeList.lock);
mDebug("register to stream task node list");
keepStreamTasksInBuf(&streamObj, &execNodeList); keepStreamTasksInBuf(&streamObj, &execNodeList);
taosThreadMutexUnlock(&execNodeList.lock); taosThreadMutexUnlock(&execNodeList.lock);
@ -876,13 +877,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
char detail[2000] = {0}; char detail[2000] = {0};
sprintf(detail, sprintf(detail,
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
", " ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64
"fillHistory:%d, igExists:%d, " ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
"igExpired:%d, igUpdate:%d, lastTs:%" PRId64
", "
"maxDelay:%" PRId64
", numOfTags:%d, sourceDB:%s, "
"targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
@ -2281,7 +2277,6 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
// todo: this process should be executed by the write queue worker of the mnode // todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -2306,10 +2301,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) { for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); int64_t k[2] = {p->streamId, p->taskId};
int32_t **index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
if (index == NULL) {
continue;
}
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, **index);
pStatusEntry->status = p->status; pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) { if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status));

View File

@ -757,7 +757,6 @@ void metaHbToMnode(void* param, void* tmrId) {
SStreamHbMsg hbMsg = {0}; SStreamHbMsg hbMsg = {0};
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) { if (pMeta == NULL) {
// taosMemoryFree(param);
return; return;
} }
@ -779,6 +778,7 @@ void metaHbToMnode(void* param, void* tmrId) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0}; SEpSet epset = {0};
bool hasValEpset = false;
hbMsg.vgId = pMeta->vgId; hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
@ -797,51 +797,53 @@ void metaHbToMnode(void* param, void* tmrId) {
if (i == 0) { if (i == 0) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset); epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasValEpset = true;
} }
} }
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
int32_t code = 0; if (hasValEpset) {
int32_t tlen = 0; int32_t code = 0;
int32_t tlen = 0;
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) { if (code < 0) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;
} }
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;
} }
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, buf, tlen); tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1;
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg);
} }
tEncoderClear(&encoder);
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1;
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
} }

View File

@ -579,19 +579,17 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition // todo fix the bug: 2. race condition
// an fill history task needs to be started. // an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
int32_t tId = pTask->historyTaskId.taskId; SStreamMeta* pMeta = pTask->pMeta;
if (tId == 0) { int32_t hTaskId = pTask->historyTaskId.taskId;
if (hTaskId == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ASSERT(pTask->status.downstreamReady == 1); ASSERT(pTask->status.downstreamReady == 1);
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->historyTaskId.streamId, tId); pTask->historyTaskId.streamId, hTaskId);
SStreamMeta* pMeta = pTask->pMeta; int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId};
int32_t hTaskId = pTask->historyTaskId.taskId;
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
// Set the execute conditions, including the query time window and the version range // Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));