Merge pull request #23610 from taosdata/fix/TD-27216

change chkp gen way
This commit is contained in:
Haojun Liao 2023-11-09 18:55:31 +08:00 committed by GitHub
commit da83f27124
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 139 additions and 127 deletions

View File

@ -37,23 +37,23 @@
typedef struct SNodeEntry {
int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SStreamExecInfo {
SArray *pNodeEntryList;
SArray * pNodeEntryList;
int64_t ts; // snapshot ts
int64_t activeCheckpoint; // active check point id
SHashObj *pTaskMap;
SArray *pTaskList;
SHashObj * pTaskMap;
SArray * pTaskList;
TdThreadMutex lock;
} SStreamExecInfo;
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
SArray * pUpdateNodeList; // SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0;
@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
@ -91,7 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -193,9 +193,9 @@ STREAM_ENCODE_OVER:
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SSdbRow * pRow = NULL;
SStreamObj *pStream = NULL;
void *buf = NULL;
void * buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
@ -272,7 +272,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
}
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName);
if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
@ -325,7 +325,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return TSDB_CODE_SUCCESS;
}
SNode *pAst = NULL;
SNode * pAst = NULL;
int32_t code = nodesStringToNode(ast, &pAst);
SQueryPlan *pPlan = NULL;
@ -350,7 +350,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL;
SNode * pAst = NULL;
SQueryPlan *pPlan = NULL;
mInfo("stream:%s to create", pCreate->name);
@ -589,7 +589,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SDbObj * pDb = NULL;
SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
@ -715,10 +715,10 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
int32_t code = -1;
SStreamObj *pStream = NULL;
SDbObj *pDb = NULL;
SStreamObj * pStream = NULL;
SDbObj * pDb = NULL;
SCMCreateStreamReq createStreamReq = {0};
SStreamObj streamObj = {0};
@ -761,7 +761,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -858,12 +858,12 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SName name = {0};
tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
//reuse this function for stream
//TODO
// reuse this function for stream
// TODO
if (createStreamReq.sql != NULL) {
auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname,
createStreamReq.sql, strlen(createStreamReq.sql));
auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname, createStreamReq.sql,
strlen(createStreamReq.sql));
}
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -877,15 +877,30 @@ _OVER:
return code;
}
int64_t mndStreamGenChkpId(SMnode *pMnode) {
SStreamObj *pStream = NULL;
void * pIter = NULL;
SSdb * pSdb = pMnode->pSdb;
int64_t maxChkpId = 0;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
maxChkpId = TMAX(maxChkpId, pStream->checkpointId);
sdbRelease(pSdb, pStream);
}
return maxChkpId + 1;
}
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->checkpointId = taosGetTimestampMs();
pMsg->checkpointId = mndStreamGenChkpId(pMnode);
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
@ -919,7 +934,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamCheckpointSourceReq(&encoder, &req);
@ -1042,7 +1057,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SArray * pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -1059,7 +1074,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
return -1;
}
void *buf;
void * buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId) < 0) {
@ -1070,7 +1085,8 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
STransAction action = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY);
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
@ -1110,9 +1126,9 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
}
static const char *mndGetStreamDB(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
@ -1126,9 +1142,9 @@ static const char *mndGetStreamDB(SMnode *pMnode) {
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
@ -1149,19 +1165,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
return 0;
}
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return 0;
}
}
bool allReady = true;
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes are ready, ignore the checkpoint")
taosArrayDestroy(pNodeSnapshot);
mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot);
return 0;
}
@ -1182,15 +1197,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status));
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status));
ready = false;
break;
}
@ -1250,7 +1265,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMDropStreamReq dropReq = {0};
@ -1327,7 +1342,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
//reuse this function for stream
// reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
@ -1379,7 +1394,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
@ -1387,7 +1402,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
}
int32_t numOfStreams = 0;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -1406,8 +1421,8 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
}
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
@ -1483,8 +1498,8 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
@ -1573,13 +1588,13 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
continue;
}
const char* pStatus = streamTaskGetStatusStr(pe->status);
const char *pStatus = streamTaskGetStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus);
// status
@ -1591,24 +1606,24 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
// input queue
char vbuf[30] = {0};
char buf[25] = {0};
const char* queueInfoStr = "%4.2fMiB (%5.2f%)";
char vbuf[30] = {0};
char buf[25] = {0};
const char *queueInfoStr = "%4.2fMiB (%5.2f%)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// output queue
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char* sinkStr = "%.2fMiB";
const char *sinkStr = "%.2fMiB";
sprintf(buf, sinkStr, pe->sinkDataSize);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset info
@ -1619,7 +1634,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
numOfRows++;
}
@ -1663,7 +1678,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
SArray* tasks = pStream->tasks;
SArray *tasks = pStream->tasks;
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
@ -1700,7 +1715,7 @@ static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, in
}
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMPauseStreamReq pauseReq = {0};
@ -1816,7 +1831,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
}
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMResumeStreamReq pauseReq = {0};
@ -1901,7 +1916,7 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId* pId, int32_t transId) {
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
@ -1924,7 +1939,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
@ -1991,7 +2006,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void *pBuf = NULL;
void * pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
@ -2012,7 +2027,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
const SEp* p = GET_ACTIVE_EP(pCurrent);
const SEp *p = GET_ACTIVE_EP(pCurrent);
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
@ -2066,9 +2081,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
return info;
}
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SVgObj *pVgroup = NULL;
*allReady = true;
@ -2115,8 +2130,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
STrans *pTrans = NULL;
void * pIter = NULL;
STrans * pTrans = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2177,9 +2192,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
}
static SArray *extractNodeListFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
while (1) {
@ -2226,9 +2241,9 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2263,11 +2278,11 @@ static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
return TSDB_CODE_SUCCESS;
}
static bool taskNodeExists(SArray* pList, int32_t nodeId) {
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SNodeEntry* pEntry = taosArrayGet(pList, i);
for (int32_t i = 0; i < num; ++i) {
SNodeEntry *pEntry = taosArrayGet(pList, i);
if (pEntry->nodeId == nodeId) {
return true;
}
@ -2277,12 +2292,12 @@ static bool taskNodeExists(SArray* pList, int32_t nodeId) {
}
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
SArray* pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for(int32_t i = 0; i < numOfTask; ++i) {
STaskId* pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId * pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
@ -2290,21 +2305,21 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
}
}
for(int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId* pId = taosArrayGet(pRemovedTasks, i);
for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId *pId = taosArrayGet(pRemovedTasks, i);
doRemoveTasks(&execInfo, pId);
}
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t) taosArrayGetSize(execInfo.pTaskList));
(int32_t)taosArrayGetSize(execInfo.pTaskList));
int32_t size = taosArrayGetSize(pNodeSnapshot);
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i);
SArray *pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
SNodeEntry *p = taosArrayGet(execInfo.pNodeEntryList, i);
for(int32_t j = 0; j < size; ++j) {
SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j);
for (int32_t j = 0; j < size; ++j) {
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
if (pEntry->nodeId == p->nodeId) {
taosArrayPush(pValidNodeEntryList, p);
break;
@ -2315,7 +2330,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
execInfo.pNodeEntryList = pValidNodeEntryList;
mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList));
mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList));
taosArrayDestroy(pRemovedTasks);
return 0;
}
@ -2347,7 +2362,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}
bool allVnodeReady = true;
bool allVnodeReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady);
if (!allVnodeReady) {
taosArrayDestroy(pNodeSnapshot);
@ -2361,7 +2376,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
// kill current active checkpoint transaction, since the transaction is vnode wide.
doKillActiveCheckpointTrans(pMnode);
code = mndProcessVgroupChange(pMnode, &changeInfo);
@ -2396,7 +2410,7 @@ typedef struct SMStreamNodeCheckMsg {
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
@ -2420,7 +2434,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
@ -2434,7 +2448,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
}
}
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2444,12 +2458,12 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId,
@ -2457,7 +2471,6 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
break;
}
}
}
}
}
@ -2487,7 +2500,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) {
return pTrans;
}
int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) {
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset");
if (pTrans == NULL) {
return terrno;
@ -2504,7 +2517,7 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
// todo extract method, with pause stream task
SVResetStreamTaskReq* pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
@ -2550,9 +2563,9 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) {
int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
int32_t transId = 0;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
@ -2583,13 +2596,13 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
return TSDB_CODE_SUCCESS;
}
int32_t mndResetFromCheckpoint(SMnode* pMnode) {
int32_t mndResetFromCheckpoint(SMnode *pMnode) {
doKillActiveCheckpointTrans(pMnode);
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
@ -2608,15 +2621,15 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
return 0;
}
int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
for (int k = 0; k < num; ++k) {
int32_t* pVgId = taosArrayGet(pNodeList, k);
int32_t *pVgId = taosArrayGet(pNodeList, k);
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
@ -2629,12 +2642,11 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
return TSDB_CODE_SUCCESS;
}
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int32_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
for (int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
@ -2646,7 +2658,7 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamHbMsg req = {0};
bool checkpointFailed = false;
@ -2699,15 +2711,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
pTaskEntry->status = p->status;
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
}
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) {
bool allReady = true;
SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady);
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady) {

View File

@ -194,8 +194,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
taosArrayPush(pFile->pSst, &sst);
}
}
{
char* buf = taosMemoryCalloc(1, 512);
if (qDebugFlag & DEBUG_TRACE) {
char* buf = taosMemoryCalloc(1, 128 + taosArrayGetSize(pFile->pSst) * 16);
sprintf(buf, "[current: %s,", pFile->pCurrent);
sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions);
@ -481,8 +481,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
}
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
SStreamSnapHandle* handle = &pWriter->handle;
if (qDebugFlag & DEBUG_DEBUG) {
char* buf = (char*)taosMemoryMalloc(1024);
if (qDebugFlag & DEBUG_TRACE) {
char* buf = (char*)taosMemoryMalloc(128 + taosArrayGetSize(handle->pFileList) * 16);
int n = sprintf(buf, "[");
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);