diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index efa99db74b..1e4bae17f6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -699,6 +699,11 @@ typedef struct { } SStreamObj; +typedef struct SStreamSeq { + char name[24]; + uint64_t seq; + SRWLatch lock; +} SStreamSeq; int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); void tFreeStreamObj(SStreamObj* pObj); @@ -730,14 +735,13 @@ typedef struct { int8_t type; int32_t numOfCols; SSchema* pSchema; - SRWLatch lock; + SRWLatch lock; } SViewObj; int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj); int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver); void tFreeSViewObj(SViewObj* pObj); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 19fd2a3fd4..6813bb5497 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -34,6 +34,11 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream); +SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw); +static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream); +static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream); +static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream); // for sma // TODO refactor int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7e0667ddbf..fcdf024a59 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -43,17 +43,17 @@ typedef struct SNodeEntry { } SNodeEntry; typedef struct SStreamExecInfo { - SArray *pNodeEntryList; + SArray * pNodeEntryList; int64_t ts; // snapshot ts int64_t activeCheckpoint; // active check point id - SHashObj *pTaskMap; - SArray *pTaskList; + SHashObj * pTaskMap; + SArray * pTaskList; TdThreadMutex lock; } SStreamExecInfo; typedef struct SVgroupChangeInfo { SHashObj *pDBMap; - SArray *pUpdateNodeList; // SArray + SArray * pUpdateNodeList; // SArray } SVgroupChangeInfo; static int32_t mndNodeCheckSentinel = 0; @@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady); +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -91,7 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillActiveCheckpointTrans(SMnode *pMnode); -static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList); +static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -103,6 +103,15 @@ int32_t mndInitStream(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndStreamActionUpdate, .deleteFp = (SdbDeleteFp)mndStreamActionDelete, }; + SSdbTable tableSeq = { + .sdbType = SDB_STREAM_SEQ, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode, + .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode, + .insertFp = (SdbInsertFp)mndStreamSeqActionInsert, + .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate, + .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); @@ -135,7 +144,13 @@ int32_t mndInitStream(SMnode *pMnode) { execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); - return sdbSetTable(pMnode->pSdb, table); + if (sdbSetTable(pMnode->pSdb, table) != 0) { + return -1; + } + if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) { + return -1; + } + return 0; } void mndCleanupStream(SMnode *pMnode) { @@ -193,9 +208,9 @@ STREAM_ENCODE_OVER: SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRow *pRow = NULL; + SSdbRow * pRow = NULL; SStreamObj *pStream = NULL; - void *buf = NULL; + void * buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) { @@ -272,7 +287,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream } SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName); if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; @@ -300,6 +315,12 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { } } +SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; } +SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; } +static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; } +static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; } +static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; } + static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) { int8_t trigger = pStream->conf.trigger; if (trigger == STREAM_TRIGGER_AT_ONCE) { @@ -325,7 +346,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 return TSDB_CODE_SUCCESS; } - SNode *pAst = NULL; + SNode * pAst = NULL; int32_t code = nodesStringToNode(ast, &pAst); SQueryPlan *pPlan = NULL; @@ -350,7 +371,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 } static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { - SNode *pAst = NULL; + SNode * pAst = NULL; SQueryPlan *pPlan = NULL; mInfo("stream:%s to create", pCreate->name); @@ -589,7 +610,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; - SDbObj *pDb = NULL; + SDbObj * pDb = NULL; SMCreateStbReq createReq = {0}; tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); @@ -715,10 +736,10 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) } static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; int32_t code = -1; - SStreamObj *pStream = NULL; - SDbObj *pDb = NULL; + SStreamObj * pStream = NULL; + SDbObj * pDb = NULL; SCMCreateStreamReq createStreamReq = {0}; SStreamObj streamObj = {0}; @@ -761,7 +782,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { int32_t numOfStream = 0; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -858,12 +879,12 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - //reuse this function for stream + // reuse this function for stream // TODO if (createStreamReq.sql != NULL) { - auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname, - createStreamReq.sql, strlen(createStreamReq.sql)); + auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname, createStreamReq.sql, + strlen(createStreamReq.sql)); } _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { @@ -879,7 +900,7 @@ _OVER: static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { return 0; } @@ -919,7 +940,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in return -1; } - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); tEncodeStreamCheckpointSourceReq(&encoder, &req); @@ -960,7 +981,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int32_t totLevel = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < totLevel; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SArray * pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *pTask = taosArrayGetP(pLevel, 0); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int32_t sz = taosArrayGetSize(pLevel); @@ -973,7 +994,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre goto _ERR; } - void *buf; + void * buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, pTask->id.taskId) < 0) { @@ -1038,7 +1059,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream int32_t totLevel = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < totLevel; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SArray * pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *pTask = taosArrayGetP(pLevel, 0); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -1055,7 +1076,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream return -1; } - void *buf; + void * buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId, pTask->id.taskId) < 0) { @@ -1066,7 +1087,8 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream STransAction action = {0}; SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); + initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, + TSDB_CODE_SYN_PROPOSE_NOT_READY); mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -1106,9 +1128,9 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } static const char *mndGetStreamDB(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { @@ -1147,11 +1169,10 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); goto _EXIT; } - bool allReady = true; + bool allReady = true; SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); if (!allReady) { - mWarn("not all vnodes are ready, ignore the checkpoint") - taosArrayDestroy(pNodeSnapshot); + mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot); return 0; } @@ -1170,7 +1191,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { // check if all tasks are in TASK_STATUS__NORMAL status for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId *p = taosArrayGet(execInfo.pTaskList, i); + STaskId * p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (pEntry == NULL) { continue; @@ -1190,9 +1211,9 @@ _EXIT: return ready == true ? 0 : -1; } static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; + void * pIter = NULL; SStreamObj *pStream = NULL; int32_t code = 0; if ((code = mndCheckNodeStatus(pMnode)) != 0) { @@ -1215,7 +1236,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMDropStreamReq dropReq = {0}; @@ -1343,7 +1364,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { } int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -1351,7 +1372,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) } int32_t numOfStreams = 0; - void *pIter = NULL; + void * pIter = NULL; while (1) { SStreamObj *pStream = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -1370,8 +1391,8 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) } static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; @@ -1447,8 +1468,8 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { } static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; @@ -1537,13 +1558,13 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // status char status[20 + VARSTR_HEADER_SIZE] = {0}; - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (pe == NULL) { continue; } - const char* pStatus = streamTaskGetStatusStr(pe->status); + const char *pStatus = streamTaskGetStatusStr(pe->status); STR_TO_VARSTR(status, pStatus); // status @@ -1664,7 +1685,7 @@ static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, in } static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMPauseStreamReq pauseReq = {0}; @@ -1780,7 +1801,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn } static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMResumeStreamReq pauseReq = {0}; @@ -1865,7 +1886,7 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang } static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, - SStreamTaskId* pId, int32_t transId) { + SStreamTaskId *pId, int32_t transId) { SStreamTaskNodeUpdateMsg req = {0}; initNodeUpdateMsg(&req, pInfo, pId, transId); @@ -1888,7 +1909,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha return -1; } - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); tEncodeStreamTaskUpdateMsg(&encoder, &req); @@ -1955,7 +1976,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p for (int32_t k = 0; k < numOfTasks; ++k) { SStreamTask *pTask = taosArrayGetP(pLevel, k); - void *pBuf = NULL; + void * pBuf = NULL; int32_t len = 0; streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); @@ -1987,8 +2008,8 @@ static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) // 1. increase the replica does not affect the stream process. // 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream // tasks on the will be removed replica. -// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we will -// handle it as mentioned in 1 & 2 items. +// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we +// will handle it as mentioned in 1 & 2 items. static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) { SVgroupChangeInfo info = { .pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), @@ -2030,9 +2051,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { + SSdb * pSdb = pMnode->pSdb; + void * pIter = NULL; SVgObj *pVgroup = NULL; *allReady = true; @@ -2079,8 +2100,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // check all streams that involved this vnode should update the epset info SStreamObj *pStream = NULL; - void *pIter = NULL; - STrans *pTrans = NULL; + void * pIter = NULL; + STrans * pTrans = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -2141,9 +2162,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } static SArray *extractNodeListFromStream(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); while (1) { @@ -2190,9 +2211,9 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { } static void doExtractTasksFromStream(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -2240,12 +2261,12 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) { } int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { - SArray* pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); + SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); - for(int32_t i = 0; i < numOfTask; ++i) { - STaskId* pId = taosArrayGet(execInfo.pTaskList, i); - STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + for (int32_t i = 0; i < numOfTask; ++i) { + STaskId * pId = taosArrayGet(execInfo.pTaskList, i); + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { @@ -2253,18 +2274,18 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { } } - for(int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { - STaskId* pId = taosArrayGet(pRemovedTasks, i); + for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { + STaskId *pId = taosArrayGet(pRemovedTasks, i); doRemoveTasks(&execInfo, pId); } mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), - (int32_t) taosArrayGetSize(execInfo.pTaskList)); + (int32_t)taosArrayGetSize(execInfo.pTaskList)); int32_t size = taosArrayGetSize(pNodeSnapshot); - SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); - for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { - SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i); + SArray *pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { + SNodeEntry *p = taosArrayGet(execInfo.pNodeEntryList, i); for (int32_t j = 0; j < size; ++j) { SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j); @@ -2278,7 +2299,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); execInfo.pNodeEntryList = pValidNodeEntryList; - mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList)); + mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList)); taosArrayDestroy(pRemovedTasks); return 0; } @@ -2310,7 +2331,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - bool allVnodeReady = true; + bool allVnodeReady = true; SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady); if (!allVnodeReady) { taosArrayDestroy(pNodeSnapshot); @@ -2324,7 +2345,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { - // kill current active checkpoint transaction, since the transaction is vnode wide. doKillActiveCheckpointTrans(pMnode); code = mndProcessVgroupChange(pMnode, &changeInfo); @@ -2359,7 +2379,7 @@ typedef struct SMStreamNodeCheckMsg { static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { return 0; } @@ -2383,7 +2403,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { SStreamTask *pTask = taosArrayGetP(pLevel, j); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p == NULL) { STaskStatusEntry entry = {0}; streamTaskStatusInit(&entry, pTask); @@ -2397,7 +2417,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } } -void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) { +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2407,7 +2427,7 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) { SStreamTask *pTask = taosArrayGetP(pLevel, j); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p != NULL) { taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); @@ -2513,9 +2533,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { int32_t transId = 0; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; STrans *pTrans = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); @@ -2546,13 +2566,13 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { return TSDB_CODE_SUCCESS; } -int32_t mndResetFromCheckpoint(SMnode* pMnode) { +int32_t mndResetFromCheckpoint(SMnode *pMnode) { doKillActiveCheckpointTrans(pMnode); // set all tasks status to be normal, refactor later to be stream level, instead of vnode level. - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { @@ -2570,15 +2590,15 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { return 0; } -int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { +int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t num = taosArrayGetSize(pNodeList); for (int k = 0; k < num; ++k) { - int32_t* pVgId = taosArrayGet(pNodeList, k); + int32_t *pVgId = taosArrayGet(pNodeList, k); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); for (int i = 0; i < numOfNodes; ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); if (pNodeEntry->nodeId == *pVgId) { mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId); @@ -2591,12 +2611,11 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { return TSDB_CODE_SUCCESS; } -static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { +static void updateStageInfo(STaskStatusEntry *pTaskEntry, int32_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); - for(int32_t j = 0; j < numOfNodes; ++j) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); + for (int32_t j = 0; j < numOfNodes; ++j) { + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); if (pNodeEntry->nodeId == pTaskEntry->nodeId) { - mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); @@ -2608,7 +2627,7 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { } int32_t mndProcessStreamHb(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; SStreamHbMsg req = {0}; bool checkpointFailed = false; @@ -2661,15 +2680,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { pTaskEntry->status = p->status; if (p->status != TASK_STATUS__READY) { - mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); + mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); } } // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (checkpointFailed && activeCheckpointId != 0) { - bool allReady = true; - SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady); + bool allReady = true; + SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); taosArrayDestroy(p); if (allReady) { diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index ddde645fae..0a20dcfd09 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -149,7 +149,8 @@ typedef enum { SDB_FUNC = 20, SDB_IDX = 21, SDB_VIEW = 22, - SDB_MAX = 23 + SDB_STREAM_SEQ = 23, + SDB_MAX = 24 } ESdbType; typedef struct SSdbRaw { @@ -169,11 +170,11 @@ typedef struct SSdbRow { } SSdbRow; typedef struct SSdb { - SMnode *pMnode; - SWal *pWal; + SMnode * pMnode; + SWal * pWal; int64_t sync; - char *currDir; - char *tmpDir; + char * currDir; + char * tmpDir; int64_t commitIndex; int64_t commitTerm; int64_t commitConfig; @@ -183,7 +184,7 @@ typedef struct SSdb { int64_t tableVer[SDB_MAX]; int64_t maxId[SDB_MAX]; EKeyType keyTypes[SDB_MAX]; - SHashObj *hashObjs[SDB_MAX]; + SHashObj * hashObjs[SDB_MAX]; TdThreadRwlock locks[SDB_MAX]; SdbInsertFp insertFps[SDB_MAX]; SdbUpdateFp updateFps[SDB_MAX]; @@ -198,25 +199,25 @@ typedef struct SSdb { typedef struct SSdbIter { TdFilePtr file; int64_t total; - char *name; + char * name; } SSdbIter; typedef struct { - ESdbType sdbType; - EKeyType keyType; - SdbDeployFp deployFp; - SdbEncodeFp encodeFp; - SdbDecodeFp decodeFp; - SdbInsertFp insertFp; - SdbUpdateFp updateFp; - SdbDeleteFp deleteFp; + ESdbType sdbType; + EKeyType keyType; + SdbDeployFp deployFp; + SdbEncodeFp encodeFp; + SdbDecodeFp decodeFp; + SdbInsertFp insertFp; + SdbUpdateFp updateFp; + SdbDeleteFp deleteFp; SdbValidateFp validateFp; } SSdbTable; typedef struct SSdbOpt { const char *path; - SMnode *pMnode; - SWal *pWal; + SMnode * pMnode; + SWal * pWal; int64_t sync; } SSdbOpt; @@ -393,7 +394,7 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver); int32_t sdbGetRawTotalSize(SSdbRaw *pRaw); SSdbRow *sdbAllocRow(int32_t objSize); -void *sdbGetRowObj(SSdbRow *pRow); +void * sdbGetRowObj(SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config);