fix ci error

This commit is contained in:
yihaoDeng 2024-03-31 00:08:23 +00:00
parent e71eefddfc
commit 9e11fe45a9
2 changed files with 23 additions and 14 deletions

View File

@ -668,6 +668,7 @@ _OVER:
if (newStb.pTags != NULL) { if (newStb.pTags != NULL) {
taosMemoryFree(newStb.pTags); taosMemoryFree(newStb.pTags);
taosMemoryFree(newStb.pColumns); taosMemoryFree(newStb.pColumns);
taosMemoryFree(newStb.pCmpr);
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -784,6 +785,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
_OVER: _OVER:
taosMemoryFree(newObj.pTags); taosMemoryFree(newObj.pTags);
taosMemoryFree(newObj.pColumns); taosMemoryFree(newObj.pColumns);
taosMemoryFree(newObj.pCmpr);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);

View File

@ -512,15 +512,16 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
createReq.numOfColumns = pStream->outputSchema.nCols; createReq.numOfColumns = pStream->outputSchema.nCols;
createReq.numOfTags = 1; // group id createReq.numOfTags = 1; // group id
createReq.pColumns = taosArrayInit_s(sizeof(SField), createReq.numOfColumns); createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
// build fields // build fields
for (int32_t i = 0; i < createReq.numOfColumns; i++) { for (int32_t i = 0; i < createReq.numOfColumns; i++) {
SField *pField = taosArrayGet(createReq.pColumns, i); SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN); tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
pField->flags = pStream->outputSchema.pSchema[i].flags; pField->flags = pStream->outputSchema.pSchema[i].flags;
pField->type = pStream->outputSchema.pSchema[i].type; pField->type = pStream->outputSchema.pSchema[i].type;
pField->bytes = pStream->outputSchema.pSchema[i].bytes; pField->bytes = pStream->outputSchema.pSchema[i].bytes;
pField->compress = createDefaultColCmprByType(pField->type);
} }
if (pStream->tagSchema.nCols == 0) { if (pStream->tagSchema.nCols == 0) {
@ -688,7 +689,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); STrans *pTrans =
doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes");
if (pTrans == NULL) { if (pTrans == NULL) {
goto _OVER; goto _OVER;
} }
@ -897,7 +899,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream"); STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
"gen checkpoint for stream");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
@ -1555,7 +1558,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
} }
} }
mInfo("stream:%s,%"PRId64 " start to pause stream", pauseReq.name, pStream->uid); mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid);
if (pStream->status == STREAM_STATUS__PAUSE) { if (pStream->status == STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1580,7 +1583,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream"); STrans *pTrans =
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1669,7 +1673,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream"); STrans *pTrans =
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1801,7 +1806,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// here create only one trans // here create only one trans
if (pTrans == NULL) { if (pTrans == NULL) {
pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); pTrans =
doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets");
if (pTrans == NULL) { if (pTrans == NULL) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
@ -2153,12 +2159,13 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) { if (pStream == NULL) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer // not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans. // the checkpoint req arrives too soon before the completion of the create stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) { if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
@ -2170,7 +2177,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
} }
} }
int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream); int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) { if (pReqTaskList == NULL) {
@ -2188,7 +2195,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
int64_t checkpointId = mndStreamGenChkpId(pMnode); int64_t checkpointId = mndStreamGenChkpId(pMnode);
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
if (pStream != NULL) { // TODO:handle error if (pStream != NULL) { // TODO:handle error
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
} else { } else {
// todo: wait for the create stream trans completed, and launch the checkpoint trans // todo: wait for the create stream trans completed, and launch the checkpoint trans
@ -2212,12 +2219,12 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
{ {
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)}; SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen); rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead* pHead = rsp.pCont; SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId); pHead->vgId = htonl(req.nodeId);
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp pReq->info.handle = NULL; // disable auto rsp
} }
return 0; return 0;