refactor: do some internal refactor.
This commit is contained in:
parent
289ec6cf66
commit
ddf98ae0c2
|
@ -656,7 +656,9 @@ _OVER:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
|
// 1. stream number check
|
||||||
|
// 2. target stable can not be target table of other existed streams.
|
||||||
|
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
|
||||||
int32_t numOfStream = 0;
|
int32_t numOfStream = 0;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -669,14 +671,16 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
|
||||||
if (numOfStream > MND_STREAM_MAX_NUM) {
|
if (numOfStream > MND_STREAM_MAX_NUM) {
|
||||||
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
|
||||||
|
pStreamObj->name);
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
||||||
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
|
||||||
|
pStreamObj->name);
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -741,7 +745,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (checkForNumOfStreams(pMnode, &streamObj) < 0) {
|
if (doStreamCheck(pMnode, &streamObj) < 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -976,7 +980,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
int64_t ts = taosGetTimestampMs();
|
int64_t ts = taosGetTimestampMs();
|
||||||
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
||||||
// mWarn("checkpoint interval less than the threshold, ignore it");
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1393,6 +1396,15 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
|
||||||
|
memset(pBuf, 0, bufLen);
|
||||||
|
pBuf[2] = '0';
|
||||||
|
pBuf[3] = 'x';
|
||||||
|
|
||||||
|
int32_t len = tintToHex(id, &pBuf[4]);
|
||||||
|
varDataSetLen(pBuf, len + 2);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -1417,19 +1429,14 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
|
|
||||||
// stream id
|
// stream id
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int32_t len = tintToHex(pStream->uid, &buf[4]);
|
int64ToHexStr(pStream->uid, buf, tListLen(buf));
|
||||||
buf[2] = '0';
|
|
||||||
buf[3] = 'x';
|
|
||||||
varDataSetLen(buf, len + 2);
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, buf, false);
|
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||||
|
|
||||||
// related fill-history stream id
|
// related fill-history stream id
|
||||||
memset(buf, 0, tListLen(buf));
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
if (pStream->hTaskUid != 0) {
|
if (pStream->hTaskUid != 0) {
|
||||||
len = tintToHex(pStream->hTaskUid, &buf[4]);
|
int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
|
||||||
varDataSetLen(buf, len + 2);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, buf, false);
|
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfo, numOfRows, buf, true);
|
colDataSetVal(pColInfo, numOfRows, buf, true);
|
||||||
|
@ -1528,11 +1535,8 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
// task id
|
// task id
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
|
||||||
char idstr[128] = {0};
|
char idstr[128] = {0};
|
||||||
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
|
int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
|
||||||
idstr[2] = '0';
|
|
||||||
idstr[3] = 'x';
|
|
||||||
varDataSetLen(idstr, len + 2);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
||||||
|
|
||||||
// node type
|
// node type
|
||||||
|
@ -1648,11 +1652,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
// history_task_id
|
// history_task_id
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
if (pe->hTaskId != 0) {
|
if (pe->hTaskId != 0) {
|
||||||
memset(idstr, 0, tListLen(idstr));
|
int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
|
||||||
len = tintToHex(pe->hTaskId, &idstr[4]);
|
|
||||||
idstr[2] = '0';
|
|
||||||
idstr[3] = 'x';
|
|
||||||
varDataSetLen(idstr, len + 2);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfo, numOfRows, 0, true);
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
@ -2023,7 +2023,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// no need to build the trans to handle the vgroup upddate
|
// no need to build the trans to handle the vgroup update
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,7 +258,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
||||||
if (numOfUpdated > 0) {
|
if (numOfUpdated > 0) {
|
||||||
mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
||||||
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1102,7 +1102,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId);
|
tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
|
||||||
|
", transId:%d s-task:0x%x ignore it",
|
||||||
|
vgId, req.checkpointId, req.transId, req.taskId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
@ -1111,7 +1113,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed", vgId, req.taskId);
|
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
|
||||||
|
" transId:%d it may have been destroyed",
|
||||||
|
vgId, req.taskId, req.checkpointId, req.transId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
@ -1123,7 +1127,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
pTask->chkInfo.checkpointingId = req.checkpointId;
|
pTask->chkInfo.checkpointingId = req.checkpointId;
|
||||||
pTask->chkInfo.transId = req.transId;
|
pTask->chkInfo.transId = req.transId;
|
||||||
|
|
||||||
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
|
||||||
", transId:%d set it failed",
|
", transId:%d set it failed",
|
||||||
pTask->id.idStr, req.checkpointId, req.transId);
|
pTask->id.idStr, req.checkpointId, req.transId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1140,7 +1144,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
if (req.mndTrigger == 1) {
|
if (req.mndTrigger == 1) {
|
||||||
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
||||||
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpoint:%" PRId64 ", set it failure",
|
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
|
||||||
pTask->id.idStr, req.checkpointId);
|
pTask->id.idStr, req.checkpointId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
|
@ -3644,7 +3644,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
|
||||||
{
|
{
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
ginitDict[i].toStrFunc((void*)key, tbuf);
|
ginitDict[i].toStrFunc((void*)key, tbuf);
|
||||||
stDebug("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
|
stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3669,7 +3669,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
{
|
{
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
ginitDict[cfIdx].toStrFunc((void*)key, tbuf);
|
ginitDict[cfIdx].toStrFunc((void*)key, tbuf);
|
||||||
stDebug("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
|
stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1141,7 +1141,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId;
|
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId;
|
||||||
|
|
||||||
if (entry.checkpointInfo.failed) {
|
if (entry.checkpointInfo.failed) {
|
||||||
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue