This commit is contained in:
yihaoDeng 2023-11-08 19:54:57 +08:00
parent 3abf0ae4b2
commit 08e3448498
4 changed files with 144 additions and 115 deletions

View File

@ -699,6 +699,11 @@ typedef struct {
} SStreamObj;
typedef struct SStreamSeq {
char name[24];
uint64_t seq;
SRWLatch lock;
} SStreamSeq;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
void tFreeStreamObj(SStreamObj* pObj);
@ -730,14 +735,13 @@ typedef struct {
int8_t type;
int32_t numOfCols;
SSchema* pSchema;
SRWLatch lock;
SRWLatch lock;
} SViewObj;
int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj);
int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver);
void tFreeSViewObj(SViewObj* pObj);
#ifdef __cplusplus
}
#endif

View File

@ -34,6 +34,11 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream);
SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw);
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
// for sma
// TODO refactor
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);

View File

@ -43,17 +43,17 @@ typedef struct SNodeEntry {
} SNodeEntry;
typedef struct SStreamExecInfo {
SArray *pNodeEntryList;
SArray * pNodeEntryList;
int64_t ts; // snapshot ts
int64_t activeCheckpoint; // active check point id
SHashObj *pTaskMap;
SArray *pTaskList;
SHashObj * pTaskMap;
SArray * pTaskList;
TdThreadMutex lock;
} SStreamExecInfo;
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
SArray * pUpdateNodeList; // SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0;
@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
@ -91,7 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -103,6 +103,15 @@ int32_t mndInitStream(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndStreamActionUpdate,
.deleteFp = (SdbDeleteFp)mndStreamActionDelete,
};
SSdbTable tableSeq = {
.sdbType = SDB_STREAM_SEQ,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
.decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
.insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
.updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
.deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
@ -135,7 +144,13 @@ int32_t mndInitStream(SMnode *pMnode) {
execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
return sdbSetTable(pMnode->pSdb, table);
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
}
if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) {
return -1;
}
return 0;
}
void mndCleanupStream(SMnode *pMnode) {
@ -193,9 +208,9 @@ STREAM_ENCODE_OVER:
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SSdbRow * pRow = NULL;
SStreamObj *pStream = NULL;
void *buf = NULL;
void * buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
@ -272,7 +287,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
}
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName);
if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
@ -300,6 +315,12 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
}
}
SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->conf.trigger;
if (trigger == STREAM_TRIGGER_AT_ONCE) {
@ -325,7 +346,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return TSDB_CODE_SUCCESS;
}
SNode *pAst = NULL;
SNode * pAst = NULL;
int32_t code = nodesStringToNode(ast, &pAst);
SQueryPlan *pPlan = NULL;
@ -350,7 +371,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL;
SNode * pAst = NULL;
SQueryPlan *pPlan = NULL;
mInfo("stream:%s to create", pCreate->name);
@ -589,7 +610,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SDbObj * pDb = NULL;
SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
@ -715,10 +736,10 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
int32_t code = -1;
SStreamObj *pStream = NULL;
SDbObj *pDb = NULL;
SStreamObj * pStream = NULL;
SDbObj * pDb = NULL;
SCMCreateStreamReq createStreamReq = {0};
SStreamObj streamObj = {0};
@ -761,7 +782,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -858,12 +879,12 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SName name = {0};
tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
//reuse this function for stream
// reuse this function for stream
// TODO
if (createStreamReq.sql != NULL) {
auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname,
createStreamReq.sql, strlen(createStreamReq.sql));
auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname, createStreamReq.sql,
strlen(createStreamReq.sql));
}
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -879,7 +900,7 @@ _OVER:
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
@ -919,7 +940,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamCheckpointSourceReq(&encoder, &req);
@ -960,7 +981,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SArray * pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
@ -973,7 +994,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
goto _ERR;
}
void *buf;
void * buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId) < 0) {
@ -1038,7 +1059,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SArray * pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -1055,7 +1076,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
return -1;
}
void *buf;
void * buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId) < 0) {
@ -1066,7 +1087,8 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
STransAction action = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY);
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
@ -1106,9 +1128,9 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
}
static const char *mndGetStreamDB(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
@ -1147,11 +1169,10 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
goto _EXIT;
}
bool allReady = true;
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes are ready, ignore the checkpoint")
taosArrayDestroy(pNodeSnapshot);
mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot);
return 0;
}
@ -1170,7 +1191,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
// check if all tasks are in TASK_STATUS__NORMAL status
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
@ -1190,9 +1211,9 @@ _EXIT:
return ready == true ? 0 : -1;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
@ -1215,7 +1236,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMDropStreamReq dropReq = {0};
@ -1343,7 +1364,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
@ -1351,7 +1372,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
}
int32_t numOfStreams = 0;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -1370,8 +1391,8 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
}
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
@ -1447,8 +1468,8 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
@ -1537,13 +1558,13 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
continue;
}
const char* pStatus = streamTaskGetStatusStr(pe->status);
const char *pStatus = streamTaskGetStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus);
// status
@ -1664,7 +1685,7 @@ static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, in
}
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMPauseStreamReq pauseReq = {0};
@ -1780,7 +1801,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
}
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMResumeStreamReq pauseReq = {0};
@ -1865,7 +1886,7 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId* pId, int32_t transId) {
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
@ -1888,7 +1909,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
@ -1955,7 +1976,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void *pBuf = NULL;
void * pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
@ -1987,8 +2008,8 @@ static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent)
// 1. increase the replica does not affect the stream process.
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
// tasks on the will be removed replica.
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we will
// handle it as mentioned in 1 & 2 items.
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
// will handle it as mentioned in 1 & 2 items.
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) {
SVgroupChangeInfo info = {
.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
@ -2030,9 +2051,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
return info;
}
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SVgObj *pVgroup = NULL;
*allReady = true;
@ -2079,8 +2100,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
STrans *pTrans = NULL;
void * pIter = NULL;
STrans * pTrans = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2141,9 +2162,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
}
static SArray *extractNodeListFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
while (1) {
@ -2190,9 +2211,9 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2240,12 +2261,12 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) {
}
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
SArray* pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for(int32_t i = 0; i < numOfTask; ++i) {
STaskId* pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId * pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
@ -2253,18 +2274,18 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
}
}
for(int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId* pId = taosArrayGet(pRemovedTasks, i);
for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId *pId = taosArrayGet(pRemovedTasks, i);
doRemoveTasks(&execInfo, pId);
}
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t) taosArrayGetSize(execInfo.pTaskList));
(int32_t)taosArrayGetSize(execInfo.pTaskList));
int32_t size = taosArrayGetSize(pNodeSnapshot);
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i);
SArray *pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
SNodeEntry *p = taosArrayGet(execInfo.pNodeEntryList, i);
for (int32_t j = 0; j < size; ++j) {
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
@ -2278,7 +2299,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
execInfo.pNodeEntryList = pValidNodeEntryList;
mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList));
mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList));
taosArrayDestroy(pRemovedTasks);
return 0;
}
@ -2310,7 +2331,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}
bool allVnodeReady = true;
bool allVnodeReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady);
if (!allVnodeReady) {
taosArrayDestroy(pNodeSnapshot);
@ -2324,7 +2345,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
// kill current active checkpoint transaction, since the transaction is vnode wide.
doKillActiveCheckpointTrans(pMnode);
code = mndProcessVgroupChange(pMnode, &changeInfo);
@ -2359,7 +2379,7 @@ typedef struct SMStreamNodeCheckMsg {
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
@ -2383,7 +2403,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
@ -2397,7 +2417,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
}
}
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2407,7 +2427,7 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
@ -2513,9 +2533,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
int32_t transId = 0;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
@ -2546,13 +2566,13 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
return TSDB_CODE_SUCCESS;
}
int32_t mndResetFromCheckpoint(SMnode* pMnode) {
int32_t mndResetFromCheckpoint(SMnode *pMnode) {
doKillActiveCheckpointTrans(pMnode);
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
@ -2570,15 +2590,15 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
return 0;
}
int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
for (int k = 0; k < num; ++k) {
int32_t* pVgId = taosArrayGet(pNodeList, k);
int32_t *pVgId = taosArrayGet(pNodeList, k);
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
@ -2591,12 +2611,11 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
return TSDB_CODE_SUCCESS;
}
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int32_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
for (int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
@ -2608,7 +2627,7 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamHbMsg req = {0};
bool checkpointFailed = false;
@ -2661,15 +2680,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
pTaskEntry->status = p->status;
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
}
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) {
bool allReady = true;
SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady);
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady) {

View File

@ -149,7 +149,8 @@ typedef enum {
SDB_FUNC = 20,
SDB_IDX = 21,
SDB_VIEW = 22,
SDB_MAX = 23
SDB_STREAM_SEQ = 23,
SDB_MAX = 24
} ESdbType;
typedef struct SSdbRaw {
@ -169,11 +170,11 @@ typedef struct SSdbRow {
} SSdbRow;
typedef struct SSdb {
SMnode *pMnode;
SWal *pWal;
SMnode * pMnode;
SWal * pWal;
int64_t sync;
char *currDir;
char *tmpDir;
char * currDir;
char * tmpDir;
int64_t commitIndex;
int64_t commitTerm;
int64_t commitConfig;
@ -183,7 +184,7 @@ typedef struct SSdb {
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX];
SHashObj * hashObjs[SDB_MAX];
TdThreadRwlock locks[SDB_MAX];
SdbInsertFp insertFps[SDB_MAX];
SdbUpdateFp updateFps[SDB_MAX];
@ -198,25 +199,25 @@ typedef struct SSdb {
typedef struct SSdbIter {
TdFilePtr file;
int64_t total;
char *name;
char * name;
} SSdbIter;
typedef struct {
ESdbType sdbType;
EKeyType keyType;
SdbDeployFp deployFp;
SdbEncodeFp encodeFp;
SdbDecodeFp decodeFp;
SdbInsertFp insertFp;
SdbUpdateFp updateFp;
SdbDeleteFp deleteFp;
ESdbType sdbType;
EKeyType keyType;
SdbDeployFp deployFp;
SdbEncodeFp encodeFp;
SdbDecodeFp decodeFp;
SdbInsertFp insertFp;
SdbUpdateFp updateFp;
SdbDeleteFp deleteFp;
SdbValidateFp validateFp;
} SSdbTable;
typedef struct SSdbOpt {
const char *path;
SMnode *pMnode;
SWal *pWal;
SMnode * pMnode;
SWal * pWal;
int64_t sync;
} SSdbOpt;
@ -393,7 +394,7 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver);
int32_t sdbGetRawTotalSize(SSdbRaw *pRaw);
SSdbRow *sdbAllocRow(int32_t objSize);
void *sdbGetRowObj(SSdbRow *pRow);
void * sdbGetRowObj(SSdbRow *pRow);
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config);