diff --git a/source/dnode/mnode/impl/src/mndIndex.c b/source/dnode/mnode/impl/src/mndIndex.c index f2e2c556cf..bb77f6c69b 100644 --- a/source/dnode/mnode/impl/src/mndIndex.c +++ b/source/dnode/mnode/impl/src/mndIndex.c @@ -668,6 +668,7 @@ _OVER: if (newStb.pTags != NULL) { taosMemoryFree(newStb.pTags); taosMemoryFree(newStb.pColumns); + taosMemoryFree(newStb.pCmpr); } mndTransDrop(pTrans); return code; @@ -784,6 +785,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p _OVER: taosMemoryFree(newObj.pTags); taosMemoryFree(newObj.pColumns); + taosMemoryFree(newObj.pCmpr); mndTransDrop(pTrans); mndReleaseStb(pMnode, pStb); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6067af199e..cbb5205eca 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -512,15 +512,16 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); createReq.numOfColumns = pStream->outputSchema.nCols; createReq.numOfTags = 1; // group id - createReq.pColumns = taosArrayInit_s(sizeof(SField), createReq.numOfColumns); + createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns); // build fields for (int32_t i = 0; i < createReq.numOfColumns; i++) { - SField *pField = taosArrayGet(createReq.pColumns, i); + SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i); tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN); pField->flags = pStream->outputSchema.pSchema[i].flags; pField->type = pStream->outputSchema.pSchema[i].type; pField->bytes = pStream->outputSchema.pSchema[i].bytes; + pField->compress = createDefaultColCmprByType(pField->type); } if (pStream->tagSchema.nCols == 0) { @@ -688,7 +689,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); + STrans *pTrans = + doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); if (pTrans == NULL) { goto _OVER; } @@ -897,7 +899,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream"); + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, + "gen checkpoint for stream"); if (pTrans == NULL) { mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); @@ -1555,7 +1558,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } } - mInfo("stream:%s,%"PRId64 " start to pause stream", pauseReq.name, pStream->uid); + mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid); if (pStream->status == STREAM_STATUS__PAUSE) { sdbRelease(pMnode->pSdb, pStream); @@ -1580,7 +1583,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream"); + STrans *pTrans = + doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream"); if (pTrans == NULL) { mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1669,7 +1673,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream"); + STrans *pTrans = + doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream"); if (pTrans == NULL) { mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1801,7 +1806,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // here create only one trans if (pTrans == NULL) { - pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); + pTrans = + doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); if (pTrans == NULL) { sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); @@ -2153,12 +2159,13 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); if (pStream == NULL) { - mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); + mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", + req.streamId); // not in meta-store yet, try to acquire the task in exec buffer // the checkpoint req arrives too soon before the completion of the create stream trans. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; - void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; @@ -2170,7 +2177,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { } } - int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream); + int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { @@ -2188,7 +2195,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int64_t checkpointId = mndStreamGenChkpId(pMnode); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - if (pStream != NULL) { // TODO:handle error + if (pStream != NULL) { // TODO:handle error int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans @@ -2212,12 +2219,12 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { { SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)}; rsp.pCont = rpcMallocCont(rsp.contLen); - SMsgHead* pHead = rsp.pCont; + SMsgHead *pHead = rsp.pCont; pHead->vgId = htonl(req.nodeId); tmsgSendRsp(&rsp); - pReq->info.handle = NULL; // disable auto rsp + pReq->info.handle = NULL; // disable auto rsp } return 0;