fix(stream): fix error found by ci.

This commit is contained in:
Haojun Liao 2023-08-30 10:14:01 +08:00
parent 8dadae5d3a
commit 3a7a220d43
7 changed files with 69 additions and 56 deletions

View File

@ -400,8 +400,9 @@ typedef struct SStreamMeta {
TdThreadMutex backendMutex;
SMetaHbInfo hbInfo;
int32_t closedTask;
int32_t totalTasks; // this value should be increased when a new task is added into the meta
int32_t chkptNotReadyTasks;
int64_t rid;
int64_t rid;
int64_t chkpId;
SArray* chkpSaved;
@ -711,6 +712,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);

View File

@ -1687,7 +1687,8 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta);
pMeta->totalTasks = pMeta->chkptNotReadyTasks;
}
total = taosArrayGetSize(pMeta->pTaskList);
@ -1798,19 +1799,6 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} else {
tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask));
}
// bool allStopped = true;
// int32_t numOfCount = streamMetaGetNumOfTasks(pMeta);
// for(int32_t i = 0; i < numOfCount; ++i) {
// SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
//
// int64_t keys1[2] = {pId->streamId, pId->taskId};
// SStreamTask** p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1));
// if ((*p)->status.taskStatus != TASK_STATUS__STOP) {
// allStopped = false;
// tqDebug("vgId:%d, s-task:0x%"PRIx64"-0x%x not updated yet", vgId, keys1[0], pId->taskId);
// break;
// }
// }
taosWUnLockLatch(&pMeta->lock);

View File

@ -39,6 +39,7 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
void tqUpdateNodeStage(STQ* pTq) {
SSyncState state = syncGetState(pTq->pVnode->sync);
pTq->pStreamMeta->stage = state.term;
tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage);
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {

View File

@ -423,12 +423,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
taosRealPath(tdir, NULL, sizeof(tdir));
// open sma
if (smaOpen(pVnode, rollback)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open query
if (vnodeQueryOpen(pVnode)) {
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
@ -436,6 +430,19 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
goto _err;
}
// sma required the tq is initialized before the vnode open
pVnode->pTq = tqOpen(tdir, pVnode);
if (pVnode->pTq == NULL) {
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open sma
if (smaOpen(pVnode, rollback)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// vnode begin
if (vnodeBegin(pVnode) < 0) {
vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
@ -450,12 +457,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
goto _err;
}
pVnode->pTq = tqOpen(tdir, pVnode);
if (pVnode->pTq == NULL) {
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
if (rollback) {
vnodeRollback(pVnode);
}

View File

@ -67,7 +67,6 @@ static void streamSchedByTimer(void* param, void* tmrId) {
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr);
streamMetaReleaseTask(NULL, pTask);
return;
}
@ -410,7 +409,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
qDebug("s-task:%s new data arrived, active the trigger, trigerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
}
return 0;

View File

@ -181,8 +181,10 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
{ // todo: remove this when the pipeline checkpoint generating is used.
SStreamMeta* pMeta = pTask->pMeta;
taosWLockLatch(&pMeta->lock);
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
pMeta->chkptNotReadyTasks = streamMetaGetNumOfStreamTasks(pMeta);
pMeta->totalTasks = pMeta->chkptNotReadyTasks;
}
taosWUnLockLatch(&pMeta->lock);
@ -272,6 +274,9 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
keys[1] = pId->taskId;
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p->info.fillHistory == 1) {
continue;
}
int8_t prev = p->status.taskStatus;
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
@ -304,36 +309,36 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = 0;
// if (pTask->status.taskStatus == TASK_STATUS__CK_READY) {
// check for all tasks, and do generate the vnode-wide checkpoint data.
SStreamMeta* pMeta = pTask->pMeta;
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0);
// check for all tasks, and do generate the vnode-wide checkpoint data.
SStreamMeta* pMeta = pTask->pMeta;
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0);
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
pMeta->totalTasks = 0;
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId);
} else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d",
pMeta->vgId, pTask->id.idStr, remain, (int32_t)taosArrayGetSize(pMeta->pTaskList));
}
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId);
} else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId,
pTask->id.idStr, remain, pMeta->totalTasks);
}
// send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointReadyMsg(pTask);
}
// send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointReadyMsg(pTask);
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s",
pTask->id.idStr, pTask->checkpointingId, tstrerror(code));
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
pTask->checkpointingId, tstrerror(code));
}
return code;
}

View File

@ -258,6 +258,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
// release the ref by timer
if (p->triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
qDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
taosTmrStop(p->schedTimer);
p->triggerParam = 0;
streamMetaReleaseTask(pMeta, p);
@ -401,6 +402,22 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
return (int32_t)size;
}
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
int32_t num = 0;
size_t size = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < size; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if ((*p)->info.fillHistory == 0) {
num += 1;
}
}
return num;
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
taosRLockLatch(&pMeta->lock);