fix(stream): fix error in check stream state.

This commit is contained in:
Haojun Liao 2023-08-21 11:48:55 +08:00
parent 6fe649c458
commit 29bb5854cc
5 changed files with 128 additions and 82 deletions

View File

@ -75,6 +75,9 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *doExtractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
@ -1097,6 +1100,22 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
return 0; return 0;
} }
static const char *mndGetStreamDB(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
return NULL;
}
const char *p = taosStrdup(pStream->sourceDb);
mndReleaseStream(pMnode, pStream);
sdbCancelFetch(pSdb, pIter);
return p;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
@ -1104,6 +1123,36 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
int32_t code = 0; int32_t code = 0;
{
int64_t ts = taosGetTimestampSec();
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
if (execNodeList.pNodeEntryList != NULL) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
}
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
}
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
execNodeList.ts = ts;
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0;
}
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
if (nodeUpdated) {
mDebug("stream task not ready due to node update, not generate checkpoint");
return 0;
}
}
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId; int64_t checkpointId = pMsg->checkpointId;
@ -1114,7 +1163,10 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
} }
mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId);
mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); const char* pDb = mndGetStreamDB(pMnode);
mndTransSetDbName(pTrans, pDb, "checkpoint");
taosMemoryFree((void*)pDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
@ -1132,11 +1184,13 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
break; break;
} }
} }
if (code == 0) { if (code == 0) {
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepre trans rebalance since %s", terrstr()); mError("failed to prepre trans rebalance since %s", terrstr());
} }
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }

View File

@ -1883,6 +1883,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamMetaSaveTask(pMeta, pTask);
streamTaskStop(pTask); streamTaskStop(pTask);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
@ -1895,6 +1896,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// all tasks are closed, now let's restart the stream meta // all tasks are closed, now let's restart the stream meta
if (pMeta->closedTask == numOfCount) { if (pMeta->closedTask == numOfCount) {
tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
} }
@ -1910,18 +1912,23 @@ _end:
if (restartTasks) { if (restartTasks) {
tqDebug("vgId:%d all tasks are stopped, restart them", vgId); tqDebug("vgId:%d all tasks are stopped, restart them", vgId);
taosWLockLatch(&pMeta->lock);
streamMetaClose(pTask->pMeta); terrno = 0;
int32_t code = streamMetaReopen(pTq->pStreamMeta, 0);
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1); if (code != 0) {
if (pTq->pStreamMeta == NULL) { tqError("vgId:%d failed to reopen stream meta", vgId);
taosWUnLockLatch(&pMeta->lock);
return -1; return -1;
} }
if (streamLoadTasks(pTq->pStreamMeta) < 0) { if (streamLoadTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
taosWUnLockLatch(&pMeta->lock);
return -1; return -1;
} }
taosWUnLockLatch(&pMeta->lock);
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d, restart to all stream tasks", vgId); vInfo("vgId:%d, restart to all stream tasks", vgId);
tqCheckStreamStatus(pTq); tqCheckStreamStatus(pTq);

View File

@ -24,9 +24,9 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0; int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0; int32_t streamBackendCfWrapperId = 0;
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId); static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
static void streamMetaEnvInit() { static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendId = taosOpenRef(64, streamBackendCleanup);
@ -130,31 +130,8 @@ _err:
} }
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
// stop all running tasking and reopen later streamMetaClear(pMeta);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->schedTimer) {
taosTmrStop(pTask->schedTimer);
pTask->schedTimer = NULL;
}
if (pTask->launchTaskTimer) {
taosTmrStop(pTask->launchTaskTimer);
pTask->launchTaskTimer = NULL;
}
tFreeStreamTask(pTask);
}
// close stream backend
// streamBackendCleanup(pMeta->streamBackend);
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->streamBackendRid = -1; pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL; pMeta->streamBackend = NULL;
@ -165,60 +142,70 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64); char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
if (taosRenameFile(newPath, defaultPath) < 0) { int32_t code = taosStatFile(newPath, NULL, NULL, NULL);
if (code == 0) {
// directory exists
code = taosRenameFile(newPath, defaultPath);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
qError("vgId:%d failed to rename file, from %s to %s, code:%s", newPath, defaultPath, pMeta->vgId,
tstrerror(terrno));
taosMemoryFree(defaultPath); taosMemoryFree(defaultPath);
taosMemoryFree(newPath); taosMemoryFree(newPath);
return -1; return -1;
} }
}
pMeta->streamBackend = streamBackendInit(pMeta->path, 0); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) { if (pMeta->streamBackend == NULL) {
qError("vgId:%d failed to init stream backend", pMeta->vgId);
return -1; return -1;
} }
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosHashClear(pMeta->pTasks);
taosArrayClear(pMeta->pTaskList);
taosHashClear(pMeta->pTaskBackendUnique);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
return 0; return 0;
} }
void streamMetaClear(SStreamMeta* pMeta) {
void* pIter = NULL;
while ((pIter = taosHashIterate(pMeta->pTasks, pIter)) != NULL) {
streamMetaReleaseTask(pMeta, *(SStreamTask**)pIter);
}
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasks);
taosHashClear(pMeta->pTaskBackendUnique);
taosArrayClear(pMeta->pTaskList);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
}
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
qDebug("start to close stream meta"); qDebug("start to close stream meta");
if (pMeta == NULL) { if (pMeta == NULL) {
return; return;
} }
streamMetaClear(pMeta);
tdbAbort(pMeta->db, pMeta->txn); tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pCheckpointDb); tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db); tdbClose(pMeta->db);
void* pIter = NULL; taosArrayDestroy(pMeta->pTaskList);
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
tFreeStreamTask(*(SStreamTask**)pIter);
}
taosHashCleanup(pMeta->pTasks);
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosArrayDestroy(pMeta->chkpSaved); taosArrayDestroy(pMeta->chkpSaved);
taosArrayDestroy(pMeta->chkpInUse); taosArrayDestroy(pMeta->chkpInUse);
taosHashCleanup(pMeta->pTasks);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
qDebug("end to close stream meta"); qDebug("end to close stream meta");
} }
@ -318,7 +305,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t
return NULL; return NULL;
} }
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
if (ref > 0) { if (ref > 0) {
qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
@ -458,7 +445,9 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
chkpId = TMAX(chkpId, info.checkpointId); chkpId = TMAX(chkpId, info.checkpointId);
} }
qDebug("get max chkp id: %" PRId64 "", chkpId); qDebug("get max chkp id: %" PRId64 "", chkpId);
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
@ -466,6 +455,13 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
return chkpId; return chkpId;
} }
static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
}
int32_t streamLoadTasks(SStreamMeta* pMeta) { int32_t streamLoadTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL; TBC* pCur = NULL;
@ -486,19 +482,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
tdbFree(pKey); doClear(pKey, pVal, pCur, pRecycleList);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
return -1; return -1;
} }
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTask(&decoder, pTask) < 0) { if (tDecodeStreamTask(&decoder, pTask) < 0) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
tdbFree(pKey); doClear(pKey, pVal, pCur, pRecycleList);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
qError( qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
@ -513,7 +504,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
taosArrayPush(pRecycleList, &taskId); taosArrayPush(pRecycleList, &taskId);
int32_t total = taosArrayGetSize(pRecycleList); int32_t total = taosArrayGetSize(pRecycleList);
qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
continue; continue;
@ -524,11 +514,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p == NULL) { if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
tdbFree(pKey); doClear(pKey, pVal, pCur, pRecycleList);
tdbFree(pVal);
tdbTbcClose(pCur);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
taosArrayDestroy(pRecycleList);
return -1; return -1;
} }
@ -542,11 +529,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
} }
if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey); doClear(pKey, pVal, pCur, pRecycleList);
tdbFree(pVal);
tdbTbcClose(pCur);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
taosArrayDestroy(pRecycleList);
return -1; return -1;
} }

View File

@ -244,6 +244,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
launchFillHistoryTask(pTask); launchFillHistoryTask(pTask);
} }
// todo handle error
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;

View File

@ -475,7 +475,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
pTask->status.taskStatus = TASK_STATUS__STOP; pTask->status.taskStatus = TASK_STATUS__STOP;
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
while (!streamTaskIsIdle(pTask)) { while (pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE) {
qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel); qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel);
taosMsleep(100); taosMsleep(100);
} }