Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
33f873b8ec
|
@ -127,7 +127,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
|
||||||
cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code)));
|
cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code)));
|
||||||
cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType));
|
cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType));
|
||||||
cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows));
|
cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows));
|
||||||
if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){
|
if(pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){
|
||||||
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen];
|
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen];
|
||||||
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0';
|
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0';
|
||||||
cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr));
|
cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr));
|
||||||
|
|
|
@ -34,12 +34,14 @@ typedef struct SVnodeMgmt {
|
||||||
SAutoQWorkerPool streamPool;
|
SAutoQWorkerPool streamPool;
|
||||||
SWWorkerPool fetchPool;
|
SWWorkerPool fetchPool;
|
||||||
SSingleWorker mgmtWorker;
|
SSingleWorker mgmtWorker;
|
||||||
|
SSingleWorker mgmtMultiWorker;
|
||||||
SHashObj *hash;
|
SHashObj *hash;
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock lock;
|
||||||
SVnodesStat state;
|
SVnodesStat state;
|
||||||
STfs *pTfs;
|
STfs *pTfs;
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
bool stop;
|
bool stop;
|
||||||
|
TdThreadMutex createLock;
|
||||||
} SVnodeMgmt;
|
} SVnodeMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -69,6 +71,7 @@ typedef struct {
|
||||||
STaosQueue *pQueryQ;
|
STaosQueue *pQueryQ;
|
||||||
STaosQueue *pStreamQ;
|
STaosQueue *pStreamQ;
|
||||||
STaosQueue *pFetchQ;
|
STaosQueue *pFetchQ;
|
||||||
|
STaosQueue *pMultiMgmQ;
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -125,6 +128,7 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -377,11 +377,14 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pMgmt->createLock);
|
||||||
code = vmWriteVnodeListToFile(pMgmt);
|
code = vmWriteVnodeListToFile(pMgmt);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
code = terrno != 0 ? terrno : code;
|
code = terrno != 0 ? terrno : code;
|
||||||
|
taosThreadMutexUnlock(&pMgmt->createLock);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
taosThreadMutexUnlock(&pMgmt->createLock);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -985,7 +988,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -439,6 +439,8 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
||||||
dInfo("start to close all vnodes");
|
dInfo("start to close all vnodes");
|
||||||
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
||||||
dInfo("vnodes mgmt worker is stopped");
|
dInfo("vnodes mgmt worker is stopped");
|
||||||
|
tSingleWorkerCleanup(&pMgmt->mgmtMultiWorker);
|
||||||
|
dInfo("vnodes multiple mgmt worker is stopped");
|
||||||
|
|
||||||
int32_t numOfVnodes = 0;
|
int32_t numOfVnodes = 0;
|
||||||
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||||
|
@ -506,6 +508,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
|
||||||
vmStopWorker(pMgmt);
|
vmStopWorker(pMgmt);
|
||||||
vnodeCleanup();
|
vnodeCleanup();
|
||||||
taosThreadRwlockDestroy(&pMgmt->lock);
|
taosThreadRwlockDestroy(&pMgmt->lock);
|
||||||
|
taosThreadMutexDestroy(&pMgmt->createLock);
|
||||||
taosMemoryFree(pMgmt);
|
taosMemoryFree(pMgmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -580,6 +583,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
|
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
|
||||||
pMgmt->msgCb.mgmt = pMgmt;
|
pMgmt->msgCb.mgmt = pMgmt;
|
||||||
taosThreadRwlockInit(&pMgmt->lock, NULL);
|
taosThreadRwlockInit(&pMgmt->lock, NULL);
|
||||||
|
taosThreadMutexInit(&pMgmt->createLock, NULL);
|
||||||
|
|
||||||
pMgmt->pTfs = pInput->pTfs;
|
pMgmt->pTfs = pInput->pTfs;
|
||||||
if (pMgmt->pTfs == NULL) {
|
if (pMgmt->pTfs == NULL) {
|
||||||
|
|
|
@ -28,6 +28,31 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vmProcessMultiMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
|
SVnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
|
int32_t code = -1;
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
|
||||||
|
dGTrace("msg:%p, get from vnode-multi-mgmt queue", pMsg);
|
||||||
|
switch (pMsg->msgType) {
|
||||||
|
case TDMT_DND_CREATE_VNODE:
|
||||||
|
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsReq(pMsg)) {
|
||||||
|
if (code != 0) {
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
|
||||||
|
}
|
||||||
|
vmSendRsp(pMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SVnodeMgmt *pMgmt = pInfo->ahandle;
|
SVnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -271,6 +296,13 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
|
||||||
|
|
||||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }
|
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }
|
||||||
|
|
||||||
|
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
|
||||||
|
taosWriteQitem(pMgmt->mgmtMultiWorker.queue, pMsg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
dGTrace("msg:%p, put into vnode-mgmt queue", pMsg);
|
dGTrace("msg:%p, put into vnode-mgmt queue", pMsg);
|
||||||
|
@ -415,6 +447,20 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
|
|
||||||
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
|
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
|
||||||
|
|
||||||
|
int32_t threadNum = 0;
|
||||||
|
if (tsNumOfCores == 1) {
|
||||||
|
threadNum = 2;
|
||||||
|
} else {
|
||||||
|
threadNum = tsNumOfCores;
|
||||||
|
}
|
||||||
|
SSingleWorkerCfg multiMgmtCfg = {.min = threadNum,
|
||||||
|
.max = threadNum,
|
||||||
|
.name = "vnode-multi-mgmt",
|
||||||
|
.fp = (FItem)vmProcessMultiMgmtQueue,
|
||||||
|
.param = pMgmt};
|
||||||
|
|
||||||
|
if (tSingleWorkerInit(&pMgmt->mgmtMultiWorker, &multiMgmtCfg) != 0) return -1;
|
||||||
|
|
||||||
dDebug("vnode workers are initialized");
|
dDebug("vnode workers are initialized");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -310,7 +310,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
pWrapper = &pDnode->wrappers[ntype];
|
pWrapper = &pDnode->wrappers[ntype];
|
||||||
if (taosMkDir(pWrapper->path) != 0) {
|
if (taosMkDir(pWrapper->path) != 0) {
|
||||||
dmReleaseWrapper(pWrapper);
|
taosThreadMutexUnlock(&pDnode->mutex);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
|
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -59,7 +59,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||||
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList);
|
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList);
|
||||||
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
|
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
|
||||||
//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg);
|
|
||||||
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
||||||
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
|
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
|
||||||
|
|
||||||
|
@ -966,8 +965,8 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
code =
|
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
|
||||||
setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0);
|
TSDB_CODE_VND_INVALID_VGROUP_ID);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
}
|
}
|
||||||
|
@ -2317,6 +2316,8 @@ static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(pHash);
|
taosHashCleanup(pHash);
|
||||||
|
|
||||||
|
mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2905,7 +2906,6 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
addAllStreamTasksIntoBuf(pMnode, pExecInfo);
|
addAllStreamTasksIntoBuf(pMnode, pExecInfo);
|
||||||
extractNodeListFromStream(pMnode, pExecInfo->pNodeList);
|
|
||||||
pExecInfo->initTaskList = true;
|
pExecInfo->initTaskList = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -255,7 +255,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mndInitStreamExecInfo(pMnode, &execInfo);
|
mndInitStreamExecInfo(pMnode, &execInfo);
|
||||||
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
|
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
|
||||||
mError("invalid hbMsg from vgId:%d, discarded", req.vgId);
|
mError("vgId:%d not exists in nodeList buf, discarded", req.vgId);
|
||||||
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||||
|
|
|
@ -182,6 +182,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
|
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
|
||||||
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
|
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,6 +203,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
|
|
||||||
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
|
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,6 +214,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
" discard",
|
" discard",
|
||||||
id, vgId, pActiveInfo->activeId, checkpointId);
|
id, vgId, pActiveInfo->activeId, checkpointId);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else { // checkpointId == pActiveInfo->activeId
|
} else { // checkpointId == pActiveInfo->activeId
|
||||||
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
|
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
|
||||||
|
@ -218,6 +224,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
"checkpointId:%" PRId64 " transId:%d",
|
"checkpointId:%" PRId64 " transId:%d",
|
||||||
id, vgId, checkpointId, transId);
|
id, vgId, checkpointId, transId);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,6 +239,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
|
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -455,6 +463,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
id, vgId, pReq->taskId, numOfTasks);
|
id, vgId, pReq->taskId, numOfTasks);
|
||||||
}
|
}
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
|
// persist to disk
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -431,6 +431,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
transMsg.info.traceId = pHead->traceId;
|
transMsg.info.traceId = pHead->traceId;
|
||||||
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
|
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
|
||||||
transMsg.info.forbiddenIp = forbiddenIp;
|
transMsg.info.forbiddenIp = forbiddenIp;
|
||||||
|
transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0;
|
||||||
|
|
||||||
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
|
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
|
||||||
pConn->refId);
|
pConn->refId);
|
||||||
|
|
Loading…
Reference in New Issue