other: merge stream fix.

This commit is contained in:
Haojun Liao 2023-09-14 14:09:05 +08:00
parent c6355fcc2f
commit 1256eafddb
11 changed files with 48 additions and 44 deletions

View File

@ -718,7 +718,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta); int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaReopen(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta);

View File

@ -241,7 +241,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 600; int32_t tsStreamCheckpointTickInterval = 30;
int32_t tsStreamNodeCheckInterval = 10; int32_t tsStreamNodeCheckInterval = 10;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10; int32_t tsTtlPushIntervalSec = 10;

View File

@ -65,9 +65,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessStreamHb(SRpcMsg *pReq); static int32_t mndProcessStreamHb(SRpcMsg *pReq);
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
@ -1063,8 +1060,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
// return -1; // return -1;
// } // }
static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, int64_t chkptId) {
int64_t checkpointId) {
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
int32_t totLevel = taosArrayGetSize(pStream->tasks); int32_t totLevel = taosArrayGetSize(pStream->tasks);
@ -1088,7 +1084,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId) < 0) { pTask->id.taskId) < 0) {
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
@ -1109,7 +1105,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
} }
} }
pStream->checkpointId = checkpointId; pStream->checkpointId = chkptId;
pStream->checkpointFreq = taosGetTimestampMs(); pStream->checkpointFreq = taosGetTimestampMs();
atomic_store_64(&pStream->currentTick, 0); atomic_store_64(&pStream->currentTick, 0);
// 3. commit log: stream checkpoint info // 3. commit log: stream checkpoint info

View File

@ -1698,9 +1698,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
rsp.code = TSDB_CODE_MSG_DECODE_ERROR; rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
goto _end; tDecoderClear(&decoder);
return rsp.code;
} }
tDecoderClear(&decoder);
// update the nodeEpset when it exists // update the nodeEpset when it exists
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
@ -1713,7 +1716,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
req.taskId); req.taskId);
rsp.code = TSDB_CODE_SUCCESS; rsp.code = TSDB_CODE_SUCCESS;
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
goto _end; return rsp.code;
} }
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
@ -1753,37 +1756,32 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamTaskStop(*ppHTask); streamTaskStop(*ppHTask);
} }
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
pMeta->closedTask += 1; pMeta->closedTask += 1;
if (ppHTask != NULL) { if (ppHTask != NULL) {
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
pMeta->closedTask += 1; pMeta->closedTask += 1;
} else {
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
} }
rsp.code = 0;
// possibly only handle the stream task. // possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
allStopped = (pMeta->closedTask == numOfTasks); if (pMeta->closedTask < numOfTasks) {
if (allStopped) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask));
pMeta->closedTask = 0; taosWUnLockLatch(&pMeta->lock);
} else { } else {
tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask)); pMeta->closedTask = 0;
}
taosWUnLockLatch(&pMeta->lock);
_end:
tDecoderClear(&decoder);
if (allStopped) {
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
taosWUnLockLatch(&pMeta->lock);
} else { } else {
tqDebug("vgId:%d all tasks are stopped, restart them", vgId); tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
taosWLockLatch(&pMeta->lock);
terrno = 0; terrno = 0;
int32_t code = streamMetaReopen(pMeta, 0); int32_t code = streamMetaReopen(pMeta);
if (code != 0) { if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId); tqError("vgId:%d failed to reopen stream meta", vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
@ -1807,4 +1805,3 @@ _end:
return rsp.code; return rsp.code;
} }

View File

@ -168,7 +168,7 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
} }
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta);
if (code == 0) { if (code == 0) {
code = streamStateLoadTasks(pWriter); code = streamStateLoadTasks(pWriter);
} }

View File

@ -201,8 +201,7 @@ int32_t tqStopStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); tqDebug("vgId:%d stop all %d stream task(s)", vgId, numOfTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -232,7 +231,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -314,7 +313,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
/*int32_t code = */ streamSchedExec(pTask); /*int32_t code = */streamSchedExec(pTask);
} else { } else {
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
id, ver, maxVer); id, ver, maxVer);

View File

@ -553,13 +553,11 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
pVnode->restored = true; pVnode->restored = true;
if (vnodeIsRoleLeader(pVnode)) { if (vnodeIsRoleLeader(pVnode)) {
vInfo("vgId:%d, sync restore finished, start to launch stream tasks", vgId);
// start to restore all stream tasks // start to restore all stream tasks
if (tsDisableStream) { if (tsDisableStream) {
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq); tqStartStreamTasks(pVnode->pTq);
tqCheckAndRunStreamTaskAsync(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq);
} }

View File

@ -109,7 +109,6 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask); int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {

View File

@ -211,7 +211,7 @@ _err:
return NULL; return NULL;
} }
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { int32_t streamMetaReopen(SStreamMeta* pMeta) {
streamMetaClear(pMeta); streamMetaClear(pMeta);
pMeta->streamBackendRid = -1; pMeta->streamBackendRid = -1;

View File

@ -395,7 +395,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
} }
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
if (cap < 100 || rate < 50 || pBucket == NULL) { if (cap < 50 || rate < 50 || pBucket == NULL) {
qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }

View File

@ -384,8 +384,22 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->dataRange.range.minVer = ver; pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb; pTask->pMsgCb = pMsgCb;
streamTaskInitTokenBucket(&pTask->tokenBucket, 100, 100); streamTaskInitTokenBucket(&pTask->tokenBucket, 50, 50);
taosThreadMutexInit(&pTask->lock, NULL);
TdThreadMutexAttr attr = {0};
int ret = taosThreadMutexAttrInit(&attr);
if (ret != 0) {
qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(ret));
return ret;
}
ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
if (ret != 0) {
qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(ret));
return ret;
}
taosThreadMutexInit(&pTask->lock, &attr);
streamTaskOpenAllUpstreamInput(pTask); streamTaskOpenAllUpstreamInput(pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -578,7 +592,8 @@ int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) { int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus; int8_t status = pTask->status.schedStatus;
ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE); ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE ||
status == TASK_SCHED_STATUS__INACTIVE);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);