refactor(stream): add long exec stream queue for history tasks in step1 and re-calculate task execution.
This commit is contained in:
parent
73ec4af4a7
commit
25cdfa5ee9
|
@ -38,6 +38,7 @@ typedef enum {
|
|||
STREAM_QUEUE,
|
||||
ARB_QUEUE,
|
||||
STREAM_CTRL_QUEUE,
|
||||
STREAM_LONG_EXEC_QUEUE,
|
||||
QUEUE_MAX,
|
||||
} EQueueType;
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
|
|||
|
||||
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool);
|
||||
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool);
|
||||
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp);
|
||||
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum);
|
||||
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue);
|
||||
|
||||
int32_t tWWorkerInit(SWWorkerPool *pool);
|
||||
|
|
|
@ -32,6 +32,7 @@ typedef struct SVnodeMgmt {
|
|||
const char *name;
|
||||
SQueryAutoQWorkerPool queryPool;
|
||||
SAutoQWorkerPool streamPool;
|
||||
SAutoQWorkerPool streamLongExecPool;
|
||||
SWWorkerPool streamCtrlPool;
|
||||
SWWorkerPool fetchPool;
|
||||
SSingleWorker mgmtWorker;
|
||||
|
@ -75,6 +76,7 @@ typedef struct {
|
|||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pStreamQ;
|
||||
STaosQueue *pStreamCtrlQ;
|
||||
STaosQueue *pStreamLongExecQ;
|
||||
STaosQueue *pFetchQ;
|
||||
STaosQueue *pMultiMgmQ;
|
||||
} SVnodeObj;
|
||||
|
@ -137,6 +139,8 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -1008,27 +1008,29 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -398,10 +398,14 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
|
|||
|
||||
dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
|
||||
pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
|
||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
|
||||
while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
|
||||
pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
|
||||
while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
|
||||
|
||||
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
|
||||
|
||||
|
|
|
@ -150,7 +150,7 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
|
|||
SRpcMsg *pMsg = pItem;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-stream-ctrl queue", pVnode->vgId, pMsg);
|
||||
code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
|
@ -165,6 +165,26 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
int32_t code = 0;
|
||||
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-stream long-exec queue", pVnode->vgId, pMsg);
|
||||
|
||||
code = vnodeProcessStreamLongExecMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||
tstrerror(code));
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
@ -274,9 +294,13 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
code = taosWriteQitem(pVnode->pStreamQ, pMsg);
|
||||
break;
|
||||
case STREAM_CTRL_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-stream-ctrl queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg);
|
||||
break;
|
||||
case STREAM_LONG_EXEC_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||
|
@ -335,6 +359,8 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMs
|
|||
|
||||
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
|
||||
|
@ -409,6 +435,10 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
break;
|
||||
case STREAM_CTRL_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pStreamCtrlQ);
|
||||
break;
|
||||
case STREAM_LONG_EXEC_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pStreamLongExecQ);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -451,13 +481,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
}
|
||||
|
||||
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
||||
|
||||
// init stream msg processing queue family
|
||||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2);
|
||||
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
|
||||
pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1);
|
||||
|
||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
|
||||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
|
||||
|| pVnode->pStreamCtrlQ == NULL) {
|
||||
|| pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -473,6 +506,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
||||
taosQueueGetThreadId(pVnode->pFetchQ));
|
||||
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
|
||||
dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ);
|
||||
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
|
||||
taosQueueGetThreadId(pVnode->pStreamCtrlQ));
|
||||
return 0;
|
||||
|
@ -481,17 +515,22 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
||||
tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ);
|
||||
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
|
||||
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
pVnode->pQueryQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
|
||||
pVnode->pStreamQ = NULL;
|
||||
pVnode->pStreamCtrlQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
pVnode->pStreamLongExecQ = NULL;
|
||||
|
||||
dDebug("vgId:%d, queue is freed", pVnode->vgId);
|
||||
}
|
||||
|
||||
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool;
|
||||
pQPool->name = "vnode-query";
|
||||
pQPool->min = tsNumOfVnodeQueryThreads;
|
||||
|
@ -505,8 +544,13 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
|
||||
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
|
||||
|
||||
SAutoQWorkerPool *pLongExecPool = &pMgmt->streamLongExecPool;
|
||||
pLongExecPool->name = "vnode-stream-long-exec";
|
||||
pLongExecPool->ratio = tsRatioOfVnodeStreamThreads/3;
|
||||
if ((code = tAutoQWorkerInit(pLongExecPool)) != 0) return code;
|
||||
|
||||
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
|
||||
pStreamCtrlPool->name = "vnode-ctrl-stream";
|
||||
pStreamCtrlPool->name = "vnode-stream-ctrl";
|
||||
pStreamCtrlPool->max = 1;
|
||||
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
|
||||
|
||||
|
@ -541,6 +585,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
void vmStopWorker(SVnodeMgmt *pMgmt) {
|
||||
tQueryAutoQWorkerCleanup(&pMgmt->queryPool);
|
||||
tAutoQWorkerCleanup(&pMgmt->streamPool);
|
||||
tAutoQWorkerCleanup(&pMgmt->streamLongExecPool);
|
||||
tWWorkerCleanup(&pMgmt->streamCtrlPool);
|
||||
tWWorkerCleanup(&pMgmt->fetchPool);
|
||||
dDebug("vnode workers are closed");
|
||||
|
|
|
@ -113,6 +113,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
|||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
|
||||
|
|
|
@ -931,9 +931,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
|
||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
if (!syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
}
|
||||
|
@ -945,8 +943,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_RSP:
|
||||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_GET_STREAM_PROGRESS:
|
||||
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
|
||||
default:
|
||||
|
@ -993,6 +989,22 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn
|
|||
}
|
||||
}
|
||||
|
||||
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in stream long exec queue is processing", pVnode->config.vgId, pMsg);
|
||||
if (!syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
}
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream long exec queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||
if (code) {
|
||||
|
|
|
@ -42,9 +42,7 @@ typedef struct SIndefOperatorInfo {
|
|||
} SIndefOperatorInfo;
|
||||
|
||||
static int32_t doGenerateSourceData(SOperatorInfo* pOperator);
|
||||
static SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator);
|
||||
static int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||
static SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator);
|
||||
static int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||
static int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
|
||||
static int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
|
||||
|
@ -557,12 +555,6 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
|
|||
}
|
||||
}
|
||||
|
||||
SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pResBlock = NULL;
|
||||
pOperator->pTaskInfo->code = doApplyIndefinitFunction(pOperator, &pResBlock);
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||
|
|
|
@ -875,7 +875,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore
|
||||
if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore
|
||||
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
|
||||
streamTaskSetIdleInfo(pTask, 500);
|
||||
return code;
|
||||
|
|
|
@ -76,7 +76,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
|||
memcpy(serializedReq, &req, len);
|
||||
|
||||
SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
|
||||
return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg);
|
||||
return tmsgPutToQueue(pTask->pMsgCb, STREAM_LONG_EXEC_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
|
|
|
@ -256,7 +256,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
|
||||
int32_t code;
|
||||
STaosQueue *queue;
|
||||
|
||||
|
@ -280,7 +280,10 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
|||
int32_t queueNum = taosGetQueueNumber(pool->qset);
|
||||
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
|
||||
int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
|
||||
if (dstWorkerNum < 2) dstWorkerNum = 2;
|
||||
|
||||
if (dstWorkerNum < minNum) {
|
||||
dstWorkerNum = minNum;
|
||||
}
|
||||
|
||||
// spawn a thread to process queue
|
||||
while (curWorkerNum < dstWorkerNum) {
|
||||
|
|
Loading…
Reference in New Issue