Merge pull request #29954 from taosdata/enh/killtrans

enh(stream): not waiting during drop stream tasks.
This commit is contained in:
Simon Guan 2025-02-28 10:40:53 +08:00 committed by GitHub
commit 1ae2bb28e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 184 additions and 62 deletions

View File

@ -38,6 +38,7 @@ typedef enum {
STREAM_QUEUE, STREAM_QUEUE,
ARB_QUEUE, ARB_QUEUE,
STREAM_CTRL_QUEUE, STREAM_CTRL_QUEUE,
STREAM_LONG_EXEC_QUEUE,
QUEUE_MAX, QUEUE_MAX,
} EQueueType; } EQueueType;

View File

@ -180,7 +180,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
*/ */
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode); int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration);
bool qTaskIsExecuting(qTaskInfo_t qinfo); bool qTaskIsExecuting(qTaskInfo_t qinfo);

View File

@ -58,6 +58,7 @@ extern "C" {
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
#define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_RESUME_TASK (-6)
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7) #define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
#define STREAM_EXEC_T_STOP_ONE_TASK (-8)
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;
@ -752,15 +753,19 @@ void streamMetaCleanup();
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, int32_t vgId, int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, int32_t vgId,
int64_t stage, startComplete_fn_t fn, SStreamMeta** pMeta); int64_t stage, startComplete_fn_t fn, SStreamMeta** pMeta);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask); int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask);
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta); void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
@ -794,6 +799,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta); bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts); int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts);

View File

@ -76,7 +76,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool); int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool);
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool); void tAutoQWorkerCleanup(SAutoQWorkerPool *pool);
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp); STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum);
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue); void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue);
int32_t tWWorkerInit(SWWorkerPool *pool); int32_t tWWorkerInit(SWWorkerPool *pool);

View File

@ -32,6 +32,7 @@ typedef struct SVnodeMgmt {
const char *name; const char *name;
SQueryAutoQWorkerPool queryPool; SQueryAutoQWorkerPool queryPool;
SAutoQWorkerPool streamPool; SAutoQWorkerPool streamPool;
SAutoQWorkerPool streamLongExecPool;
SWWorkerPool streamCtrlPool; SWWorkerPool streamCtrlPool;
SWWorkerPool fetchPool; SWWorkerPool fetchPool;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
@ -75,6 +76,7 @@ typedef struct {
STaosQueue *pQueryQ; STaosQueue *pQueryQ;
STaosQueue *pStreamQ; STaosQueue *pStreamQ;
STaosQueue *pStreamCtrlQ; STaosQueue *pStreamCtrlQ;
STaosQueue *pStreamLongExecQ;
STaosQueue *pFetchQ; STaosQueue *pFetchQ;
STaosQueue *pMultiMgmQ; STaosQueue *pMultiMgmQ;
} SVnodeObj; } SVnodeObj;
@ -137,6 +139,8 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -1008,27 +1008,29 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -398,10 +398,14 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId, dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ)); pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ); dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);

View File

@ -150,7 +150,7 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
SRpcMsg *pMsg = pItem; SRpcMsg *pMsg = pItem;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p get from vnode-stream-ctrl queue", pVnode->vgId, pMsg);
code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo); code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
@ -165,6 +165,26 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
} }
} }
static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
const STraceId *trace = &pMsg->info.traceId;
int32_t code = 0;
dGTrace("vgId:%d, msg:%p get from vnode-stream long-exec queue", pVnode->vgId, pMsg);
code = vnodeProcessStreamLongExecMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
terrno = code;
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
tstrerror(code));
vmSendRsp(pMsg, code);
}
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
@ -274,9 +294,13 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
code = taosWriteQitem(pVnode->pStreamQ, pMsg); code = taosWriteQitem(pVnode->pStreamQ, pMsg);
break; break;
case STREAM_CTRL_QUEUE: case STREAM_CTRL_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-stream-ctrl queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg); code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg);
break; break;
case STREAM_LONG_EXEC_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg);
break;
case FETCH_QUEUE: case FETCH_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg);
@ -335,6 +359,8 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMs
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); } int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); }
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); }
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
@ -409,6 +435,10 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
break; break;
case STREAM_CTRL_QUEUE: case STREAM_CTRL_QUEUE:
size = taosQueueItemSize(pVnode->pStreamCtrlQ); size = taosQueueItemSize(pVnode->pStreamCtrlQ);
break;
case STREAM_LONG_EXEC_QUEUE:
size = taosQueueItemSize(pVnode->pStreamLongExecQ);
break;
default: default:
break; break;
} }
@ -451,13 +481,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
} }
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
// init stream msg processing queue family
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2);
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue); pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1);
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
|| pVnode->pStreamCtrlQ == NULL) { || pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -473,6 +506,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
taosQueueGetThreadId(pVnode->pFetchQ)); taosQueueGetThreadId(pVnode->pFetchQ));
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ);
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ, dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
taosQueueGetThreadId(pVnode->pStreamCtrlQ)); taosQueueGetThreadId(pVnode->pStreamCtrlQ));
return 0; return 0;
@ -481,17 +515,22 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ);
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ); tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pQueryQ = NULL; pVnode->pQueryQ = NULL;
pVnode->pFetchQ = NULL;
pVnode->pStreamQ = NULL; pVnode->pStreamQ = NULL;
pVnode->pStreamCtrlQ = NULL; pVnode->pStreamCtrlQ = NULL;
pVnode->pFetchQ = NULL; pVnode->pStreamLongExecQ = NULL;
dDebug("vgId:%d, queue is freed", pVnode->vgId); dDebug("vgId:%d, queue is freed", pVnode->vgId);
} }
int32_t vmStartWorker(SVnodeMgmt *pMgmt) { int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool; SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool;
pQPool->name = "vnode-query"; pQPool->name = "vnode-query";
pQPool->min = tsNumOfVnodeQueryThreads; pQPool->min = tsNumOfVnodeQueryThreads;
@ -505,8 +544,13 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pStreamPool->ratio = tsRatioOfVnodeStreamThreads; pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code; if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
SAutoQWorkerPool *pLongExecPool = &pMgmt->streamLongExecPool;
pLongExecPool->name = "vnode-stream-long-exec";
pLongExecPool->ratio = tsRatioOfVnodeStreamThreads/3;
if ((code = tAutoQWorkerInit(pLongExecPool)) != 0) return code;
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool; SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
pStreamCtrlPool->name = "vnode-ctrl-stream"; pStreamCtrlPool->name = "vnode-stream-ctrl";
pStreamCtrlPool->max = 1; pStreamCtrlPool->max = 1;
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code; if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
@ -541,6 +585,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
void vmStopWorker(SVnodeMgmt *pMgmt) { void vmStopWorker(SVnodeMgmt *pMgmt) {
tQueryAutoQWorkerCleanup(&pMgmt->queryPool); tQueryAutoQWorkerCleanup(&pMgmt->queryPool);
tAutoQWorkerCleanup(&pMgmt->streamPool); tAutoQWorkerCleanup(&pMgmt->streamPool);
tAutoQWorkerCleanup(&pMgmt->streamLongExecPool);
tWWorkerCleanup(&pMgmt->streamCtrlPool); tWWorkerCleanup(&pMgmt->streamCtrlPool);
tWWorkerCleanup(&pMgmt->fetchPool); tWWorkerCleanup(&pMgmt->fetchPool);
dDebug("vnode workers are closed"); dDebug("vnode workers are closed");

View File

@ -113,6 +113,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);

View File

@ -1302,7 +1302,7 @@ _checkpoint:
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
taosHashCancelIterate(pInfoHash, infoHash); taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -260,13 +260,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
// stream do update the nodeEp info, write it into stream meta. // stream do update the nodeEp info, write it into stream meta.
if (updated) { if (updated) {
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId); tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
code = streamMetaSaveTask(pMeta, pTask); code = streamMetaSaveTaskInMeta(pMeta, pTask);
if (code) { if (code) {
tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code)); tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
} }
if (pHTask != NULL) { if (pHTask != NULL) {
code = streamMetaSaveTask(pMeta, pHTask); code = streamMetaSaveTaskInMeta(pMeta, pHTask);
if (code) { if (code) {
tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code)); tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
} }
@ -743,6 +743,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
return 0; // always return success return 0; // always return success
} }
@ -857,6 +859,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return code; return code;
} else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);

View File

@ -945,8 +945,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP: case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_VND_GET_STREAM_PROGRESS: case TDMT_VND_GET_STREAM_PROGRESS:
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
default: default:
@ -993,6 +991,22 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn
} }
} }
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in stream long exec queue is processing", pVnode->config.vgId, pMsg);
if (!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
switch (pMsg->msgType) {
case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream long exec queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
}
}
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
if (code) { if (code) {

View File

@ -972,26 +972,43 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
int64_t st = taosGetTimestampMs();
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo == NULL) { if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo)); if (waitDuration > 0) {
qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0);
} else {
qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
}
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
while (1) { if (waitDuration > 0) {
taosWLockLatch(&pTaskInfo->lock); while (1) {
if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again taosWLockLatch(&pTaskInfo->lock);
taosWUnLockLatch(&pTaskInfo->lock); if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
taosMsleep(100); taosWUnLockLatch(&pTaskInfo->lock);
} else { // not running now
pTaskInfo->code = rspCode; taosMsleep(200);
taosWUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS; int64_t d = taosGetTimestampMs() - st;
if (d >= waitDuration && waitDuration >= 0) {
qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
return TSDB_CODE_SUCCESS;
}
} else { // not running now
pTaskInfo->code = rspCode;
taosWUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
}
} }
} }
return TSDB_CODE_SUCCESS;
} }
bool qTaskIsExecuting(qTaskInfo_t qinfo) { bool qTaskIsExecuting(qTaskInfo_t qinfo) {

View File

@ -42,9 +42,7 @@ typedef struct SIndefOperatorInfo {
} SIndefOperatorInfo; } SIndefOperatorInfo;
static int32_t doGenerateSourceData(SOperatorInfo* pOperator); static int32_t doGenerateSourceData(SOperatorInfo* pOperator);
static SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator);
static int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock); static int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
static SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator);
static int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock); static int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
static int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList); static int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
static int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, static int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
@ -557,12 +555,6 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
} }
} }
SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) {
SSDataBlock* pResBlock = NULL;
pOperator->pTaskInfo->code = doApplyIndefinitFunction(pOperator, &pResBlock);
return pResBlock;
}
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock); QRY_PARAM_CHECK(pResBlock);
SIndefOperatorInfo* pIndefInfo = pOperator->info; SIndefOperatorInfo* pIndefInfo = pOperator->info;

View File

@ -734,7 +734,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
pTask->status.taskStatus = TASK_STATUS__READY; pTask->status.taskStatus = TASK_STATUS__READY;
code = streamMetaSaveTask(pMeta, pTask); code = streamMetaSaveTaskInMeta(pMeta, pTask);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -878,7 +878,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
} }
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id); stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
streamTaskSetIdleInfo(pTask, 500); streamTaskSetIdleInfo(pTask, 500);
return code; return code;

View File

@ -632,7 +632,7 @@ void streamMetaCloseImpl(void* arg) {
} }
// todo let's check the status for each task // todo let's check the status for each task
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
void* buf = NULL; void* buf = NULL;
int32_t len; int32_t len;
@ -682,7 +682,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return code; return code;
} }
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn); int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
if (code != 0) { if (code != 0) {
@ -705,7 +705,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p != NULL) { if (p != NULL) {
stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId); stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
return code; return code;
} }
@ -735,7 +735,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return code; return code;
} }
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
void* pUnused = taosArrayPop(pMeta->pTaskList); void* pUnused = taosArrayPop(pMeta->pTaskList);
@ -885,6 +885,7 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
int32_t code = 0; int32_t code = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask); code = streamTaskSendCheckpointSourceRsp(pTask);
if (code) { if (code) {
@ -895,7 +896,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
// let's kill the query procedure within stream, to end it ASAP. // let's kill the query procedure within stream, to end it ASAP.
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
} }
@ -932,7 +933,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
code = streamMetaRemoveTask(pMeta, &id); code = streamMetaRemoveTaskInMeta(pMeta, &id);
if (code) { if (code) {
stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code)); stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
} }
@ -963,6 +964,32 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
return 0; return 0;
} }
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SStreamTask* pTask = NULL;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = 0;
streamMetaWLock(pMeta);
// code = streamMetaUnregisterTask(pMeta, streamId, taskId);
// numOfTasks = streamMetaGetNumOfTasks(pMeta);
// if (code) {
// stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
// }
//
// code = streamMetaCommit(pMeta);
// if (code) {
// stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
// } else {
// stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
// }
streamMetaWUnLock(pMeta);
return code;
}
int32_t streamMetaBegin(SStreamMeta* pMeta) { int32_t streamMetaBegin(SStreamMeta* pMeta) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
@ -1185,7 +1212,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosArrayGetSize(pRecycleList) > 0) { if (taosArrayGetSize(pRecycleList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
STaskId* pId = taosArrayGet(pRecycleList, i); STaskId* pId = taosArrayGet(pRecycleList, i);
code = streamMetaRemoveTask(pMeta, pId); code = streamMetaRemoveTaskInMeta(pMeta, pId);
if (code) { if (code) {
stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code)); stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
} }

View File

@ -76,7 +76,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
memcpy(serializedReq, &req, len); memcpy(serializedReq, &req, len);
SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY}; SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg); return tmsgPutToQueue(pTask->pMsgCb, STREAM_LONG_EXEC_QUEUE, &rpcMsg);
} }
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {

View File

@ -451,7 +451,6 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
int64_t refId = pTask->id.refId;
int32_t ret = streamTaskStop(pTask); int32_t ret = streamTaskStop(pTask);
if (ret) { if (ret) {
stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret)); stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));

View File

@ -703,7 +703,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
} }
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code)); stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
} }
@ -862,7 +862,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
pStreamTask->status.taskStatus = TASK_STATUS__READY; pStreamTask->status.taskStatus = TASK_STATUS__READY;
} }
code = streamMetaSaveTask(pMeta, pStreamTask); code = streamMetaSaveTaskInMeta(pMeta, pStreamTask);
streamMutexUnlock(&(pStreamTask->lock)); streamMutexUnlock(&(pStreamTask->lock));
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
@ -1025,7 +1025,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
// in case of fill-history task, stop the tsdb file scan operation. // in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
code = qKillTask(pExecutor, TSDB_CODE_SUCCESS); code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000);
} }
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
@ -1287,6 +1287,8 @@ const char* streamTaskGetExecType(int32_t type) {
return "resume-task-from-idle"; return "resume-task-from-idle";
case STREAM_EXEC_T_ADD_FAILED_TASK: case STREAM_EXEC_T_ADD_FAILED_TASK:
return "record-start-failed-task"; return "record-start-failed-task";
case STREAM_EXEC_T_STOP_ONE_TASK:
return "stop-one-task";
case 0: case 0:
return "exec-all-tasks"; return "exec-all-tasks";
default: default:

View File

@ -256,7 +256,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
return NULL; return NULL;
} }
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) { STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
int32_t code; int32_t code;
STaosQueue *queue; STaosQueue *queue;
@ -280,7 +280,10 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
int32_t queueNum = taosGetQueueNumber(pool->qset); int32_t queueNum = taosGetQueueNumber(pool->qset);
int32_t curWorkerNum = taosArrayGetSize(pool->workers); int32_t curWorkerNum = taosArrayGetSize(pool->workers);
int32_t dstWorkerNum = ceilf(queueNum * pool->ratio); int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
if (dstWorkerNum < 2) dstWorkerNum = 2;
if (dstWorkerNum < minNum) {
dstWorkerNum = minNum;
}
// spawn a thread to process queue // spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) { while (curWorkerNum < dstWorkerNum) {

View File

@ -79,7 +79,7 @@
(void)streamMetaAddFailedTask (void)streamMetaAddFailedTask
(void)streamMetaAddTaskLaunchResult (void)streamMetaAddTaskLaunchResult
(void)streamMetaCommit (void)streamMetaCommit
(void)streamMetaRemoveTask (void)streamMetaRemoveTaskInMeta
(void)streamMetaSendHbHelper (void)streamMetaSendHbHelper
(void)streamMetaStartAllTasks (void)streamMetaStartAllTasks
(void)streamMetaStartOneTask (void)streamMetaStartOneTask