chore: merge 3.0
This commit is contained in:
commit
64e32e5c85
|
@ -399,7 +399,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_ENCRYPT_ALGO_SM4 4
|
#define TSDB_ENCRYPT_ALGO_SM4 4
|
||||||
#define TSDB_DEFAULT_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_NONE
|
#define TSDB_DEFAULT_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_NONE
|
||||||
#define TSDB_MIN_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_NONE
|
#define TSDB_MIN_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_NONE
|
||||||
#define TSDB_MAX_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_SM4
|
#define TSDB_MAX_ENCRYPT_ALGO TSDB_ENCRYPT_ALGO_SM4
|
||||||
#define TSDB_DEFAULT_CACHE_MODEL TSDB_CACHE_MODEL_NONE
|
#define TSDB_DEFAULT_CACHE_MODEL TSDB_CACHE_MODEL_NONE
|
||||||
#define TSDB_MIN_DB_CACHE_SIZE 1 // MB
|
#define TSDB_MIN_DB_CACHE_SIZE 1 // MB
|
||||||
#define TSDB_MAX_DB_CACHE_SIZE 65536
|
#define TSDB_MAX_DB_CACHE_SIZE 65536
|
||||||
|
|
|
@ -51,8 +51,8 @@
|
||||||
|
|
||||||
#define ENCODESQL() \
|
#define ENCODESQL() \
|
||||||
do { \
|
do { \
|
||||||
|
if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \
|
||||||
if (pReq->sqlLen > 0 && pReq->sql != NULL) { \
|
if (pReq->sqlLen > 0 && pReq->sql != NULL) { \
|
||||||
if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \
|
|
||||||
if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \
|
if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
@ -3145,7 +3145,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
||||||
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
|
||||||
ENCODESQL();
|
ENCODESQL();
|
||||||
if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
|
|
@ -2153,41 +2153,60 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId);
|
mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId);
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
|
||||||
|
|
||||||
return -1;
|
// not in meta-store yet, try to acquire the task in exec buffer
|
||||||
|
// the checkpoint req arrives too soon before the completion of the create stream trans.
|
||||||
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
|
void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||||
|
if (p == NULL) {
|
||||||
|
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
|
||||||
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
|
||||||
|
req.streamId, req.taskId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream);
|
||||||
|
|
||||||
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||||
if (pReqTaskList == NULL) {
|
if (pReqTaskList == NULL) {
|
||||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||||
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
|
doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
|
||||||
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
|
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
|
||||||
|
|
||||||
pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||||
} else {
|
} else {
|
||||||
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
|
doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t total = taosArrayGetSize(*pReqTaskList);
|
int32_t total = taosArrayGetSize(*pReqTaskList);
|
||||||
if (total == numOfTasks) { // all tasks has send the reqs
|
if (total == numOfTasks) { // all tasks has send the reqs
|
||||||
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
||||||
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
|
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
|
||||||
|
|
||||||
// TODO:handle error
|
if (pStream != NULL) { // TODO:handle error
|
||||||
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||||
|
} else {
|
||||||
|
// todo: wait for the create stream trans completed, and launch the checkpoint trans
|
||||||
|
// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||||
|
// sleep(500ms)
|
||||||
|
}
|
||||||
|
|
||||||
// remove this entry
|
// remove this entry
|
||||||
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
|
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
|
||||||
|
|
||||||
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
|
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
|
||||||
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
|
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStream != NULL) {
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseStream(pMnode, pStream);
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
|
@ -1944,8 +1944,9 @@ static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) {
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (param != NULL) {
|
|
||||||
SCliThrd* pThrd = param;
|
SCliThrd* pThrd = param;
|
||||||
|
if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) {
|
||||||
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
|
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
|
||||||
}
|
}
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
|
@ -1957,12 +1958,9 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
|
||||||
SCliMsg* pMsg = arg->param1;
|
SCliMsg* pMsg = arg->param1;
|
||||||
SCliThrd* pThrd = arg->param2;
|
SCliThrd* pThrd = arg->param2;
|
||||||
|
|
||||||
tDebug("destroy Ahandle A");
|
if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
|
||||||
if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
|
|
||||||
tDebug("destroy Ahandle B");
|
|
||||||
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
|
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
|
||||||
}
|
}
|
||||||
tDebug("destroy Ahandle C");
|
|
||||||
|
|
||||||
transDestroyConnCtx(pMsg->ctx);
|
transDestroyConnCtx(pMsg->ctx);
|
||||||
transFreeMsg(pMsg->msg.pCont);
|
transFreeMsg(pMsg->msg.pCont);
|
||||||
|
|
Loading…
Reference in New Issue