diff --git a/include/util/tdef.h b/include/util/tdef.h index a4abe90c47..a7ee4d0d56 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -399,7 +399,7 @@ typedef enum ELogicConditionType { #define TSDB_ENCRYPT_ALGO_SM4 4 #define TSDB_DEFAULT_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_NONE #define TSDB_MIN_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_NONE -#define TSDB_MAX_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_SM4 +#define TSDB_MAX_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_SM4 #define TSDB_DEFAULT_CACHE_MODEL TSDB_CACHE_MODEL_NONE #define TSDB_MIN_DB_CACHE_SIZE 1 // MB #define TSDB_MAX_DB_CACHE_SIZE 65536 diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1aaceeac98..96c3108e2a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -51,8 +51,8 @@ #define ENCODESQL() \ do { \ + if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \ if (pReq->sqlLen > 0 && pReq->sql != NULL) { \ - if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \ if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \ } \ } while (0) @@ -3145,7 +3145,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1; ENCODESQL(); - if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 60b522f6fa..6067af199e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2153,41 +2153,60 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); if (pStream == NULL) { - mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); + mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); - return -1; + // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } } - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream); + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); - doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(pList, req.taskId, req.streamId, numOfTasks); taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *)); pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); } else { - doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks); } int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs int64_t checkpointId = mndStreamGenChkpId(pMnode); - mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId); + mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - // TODO:handle error - int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + if (pStream != NULL) { // TODO:handle error + int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + } else { + // todo: wait for the create stream trans completed, and launch the checkpoint trans + // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + // sleep(500ms) + } // remove this entry taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); + } + + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); } - mndReleaseStream(pMnode, pStream); taosThreadMutexUnlock(&execInfo.lock); { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b335979cd5..2d8f4ed3c2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1944,8 +1944,9 @@ static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { if (pMsg == NULL) { return; } - if (param != NULL) { - SCliThrd* pThrd = param; + + SCliThrd* pThrd = param; + if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) { if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); } destroyCmsg(pMsg); @@ -1957,12 +1958,9 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; - tDebug("destroy Ahandle A"); - if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - tDebug("destroy Ahandle B"); + if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { pThrd->destroyAhandleFp(pMsg->ctx->ahandle); } - tDebug("destroy Ahandle C"); transDestroyConnCtx(pMsg->ctx); transFreeMsg(pMsg->msg.pCont);