Merge branch 'main' into merge/mainto3.0
This commit is contained in:
commit
2bee60a41a
|
@ -1441,7 +1441,7 @@ charset 的有效值是 UTF-8。
|
|||
- 取值范围:float/double/none
|
||||
- 默认值:none,表示关闭无损压缩
|
||||
- 动态修改:不支持
|
||||
- 支持版本:从 v3.3.0.0 前支持
|
||||
- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃
|
||||
|
||||
#### ifAdtFse
|
||||
- 说明:在启用 TSZ 有损压缩时,使用 FSE 算法替换 HUFFMAN 算法,FSE 算法压缩速度更快,但解压稍慢,追求压缩速度可选用此算法
|
||||
|
@ -1450,22 +1450,22 @@ charset 的有效值是 UTF-8。
|
|||
- 最小值:0
|
||||
- 最大值:1
|
||||
- 动态修改:支持通过 SQL 修改,重启生效
|
||||
- 支持版本:从 v3.1.0.0 版本开始引入
|
||||
- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃
|
||||
|
||||
#### maxRange
|
||||
- 说明:用于有损压缩设置 `内部参数`
|
||||
- 动态修改:支持通过 SQL 修改,重启生效
|
||||
- 支持版本:从 v3.1.0.0 版本开始引入
|
||||
- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃
|
||||
|
||||
#### curRange
|
||||
- 说明:用于有损压缩设置 `内部参数`
|
||||
- 动态修改:支持通过 SQL 修改,重启生效
|
||||
- 支持版本:从 v3.1.0.0 版本开始引入
|
||||
- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃
|
||||
|
||||
#### compressor
|
||||
- 说明:用于有损压缩设置 `内部参数`
|
||||
- 动态修改:支持通过 SQL 修改,重启生效
|
||||
- 支持版本:从 v3.1.0.0 版本开始引入
|
||||
- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃
|
||||
|
||||
**补充说明**
|
||||
1. 在 3.3.5.0 之后,所有配置参数都将被持久化到本地存储,重启数据库服务后,将默认使用持久化的配置参数列表;如果您希望继续使用 config 文件中配置的参数,需设置 forceReadConfig 为 1。
|
||||
|
|
|
@ -38,6 +38,7 @@ typedef enum {
|
|||
STREAM_QUEUE,
|
||||
ARB_QUEUE,
|
||||
STREAM_CTRL_QUEUE,
|
||||
STREAM_LONG_EXEC_QUEUE,
|
||||
QUEUE_MAX,
|
||||
} EQueueType;
|
||||
|
||||
|
|
|
@ -183,7 +183,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
|
|||
*/
|
||||
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
|
||||
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode);
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration);
|
||||
|
||||
bool qTaskIsExecuting(qTaskInfo_t qinfo);
|
||||
|
||||
|
|
|
@ -361,6 +361,7 @@ typedef struct SStateStore {
|
|||
bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key);
|
||||
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
|
||||
void (*streamStateDel)(SStreamState* pState, const SWinKey* key);
|
||||
void (*streamStateDelByGroupId)(SStreamState* pState, uint64_t groupId);
|
||||
void (*streamStateClear)(SStreamState* pState);
|
||||
void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex);
|
||||
void (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
||||
|
|
|
@ -44,6 +44,7 @@ int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, in
|
|||
bool streamStateCheck(SStreamState* pState, const SWinKey* key);
|
||||
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
|
||||
void streamStateDel(SStreamState* pState, const SWinKey* key);
|
||||
void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId);
|
||||
void streamStateClear(SStreamState* pState);
|
||||
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex);
|
||||
void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
||||
|
|
|
@ -58,6 +58,7 @@ extern "C" {
|
|||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
|
||||
#define STREAM_EXEC_T_STOP_ONE_TASK (-8)
|
||||
|
||||
typedef struct SStreamTask SStreamTask;
|
||||
typedef struct SStreamQueue SStreamQueue;
|
||||
|
@ -768,15 +769,19 @@ void streamMetaCleanup();
|
|||
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, int32_t vgId,
|
||||
int64_t stage, startComplete_fn_t fn, SStreamMeta** pMeta);
|
||||
void streamMetaClose(SStreamMeta* streamMeta);
|
||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
|
||||
|
||||
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
||||
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey);
|
||||
|
||||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
|
||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
||||
|
||||
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask);
|
||||
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
|
||||
void streamMetaClear(SStreamMeta* pMeta);
|
||||
void streamMetaInitBackend(SStreamMeta* pMeta);
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||
|
@ -810,6 +815,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
|||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
||||
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
|
||||
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts);
|
||||
|
|
|
@ -59,6 +59,7 @@ int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t k
|
|||
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
|
||||
int32_t* pWinCode);
|
||||
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
|
||||
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId);
|
||||
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
|
||||
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
|
||||
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
|
||||
|
|
|
@ -76,7 +76,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
|
|||
|
||||
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool);
|
||||
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool);
|
||||
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp);
|
||||
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum);
|
||||
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue);
|
||||
|
||||
int32_t tWWorkerInit(SWWorkerPool *pool);
|
||||
|
|
|
@ -32,6 +32,7 @@ typedef struct SVnodeMgmt {
|
|||
const char *name;
|
||||
SQueryAutoQWorkerPool queryPool;
|
||||
SAutoQWorkerPool streamPool;
|
||||
SAutoQWorkerPool streamLongExecPool;
|
||||
SWWorkerPool streamCtrlPool;
|
||||
SWWorkerPool fetchPool;
|
||||
SSingleWorker mgmtWorker;
|
||||
|
@ -75,6 +76,7 @@ typedef struct {
|
|||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pStreamQ;
|
||||
STaosQueue *pStreamCtrlQ;
|
||||
STaosQueue *pStreamLongExecQ;
|
||||
STaosQueue *pFetchQ;
|
||||
STaosQueue *pMultiMgmQ;
|
||||
} SVnodeObj;
|
||||
|
@ -137,6 +139,8 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -1008,27 +1008,29 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -398,10 +398,14 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
|
|||
|
||||
dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
|
||||
pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
|
||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
|
||||
while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId,
|
||||
pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
|
||||
while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
|
||||
|
||||
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
|
||||
|
||||
|
|
|
@ -150,7 +150,7 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
|
|||
SRpcMsg *pMsg = pItem;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-stream-ctrl queue", pVnode->vgId, pMsg);
|
||||
code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
|
@ -165,6 +165,26 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
int32_t code = 0;
|
||||
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-stream long-exec queue", pVnode->vgId, pMsg);
|
||||
|
||||
code = vnodeProcessStreamLongExecMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||
tstrerror(code));
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
@ -274,9 +294,13 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
code = taosWriteQitem(pVnode->pStreamQ, pMsg);
|
||||
break;
|
||||
case STREAM_CTRL_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-stream-ctrl queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg);
|
||||
break;
|
||||
case STREAM_LONG_EXEC_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||
|
@ -335,6 +359,8 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMs
|
|||
|
||||
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
|
||||
|
@ -409,6 +435,10 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
break;
|
||||
case STREAM_CTRL_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pStreamCtrlQ);
|
||||
break;
|
||||
case STREAM_LONG_EXEC_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pStreamLongExecQ);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -451,13 +481,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
}
|
||||
|
||||
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
||||
|
||||
// init stream msg processing queue family
|
||||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2);
|
||||
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
|
||||
pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1);
|
||||
|
||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
|
||||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
|
||||
|| pVnode->pStreamCtrlQ == NULL) {
|
||||
|| pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -473,6 +506,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
||||
taosQueueGetThreadId(pVnode->pFetchQ));
|
||||
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
|
||||
dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ);
|
||||
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
|
||||
taosQueueGetThreadId(pVnode->pStreamCtrlQ));
|
||||
return 0;
|
||||
|
@ -481,17 +515,22 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
||||
tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ);
|
||||
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
|
||||
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
pVnode->pQueryQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
|
||||
pVnode->pStreamQ = NULL;
|
||||
pVnode->pStreamCtrlQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
pVnode->pStreamLongExecQ = NULL;
|
||||
|
||||
dDebug("vgId:%d, queue is freed", pVnode->vgId);
|
||||
}
|
||||
|
||||
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
|
||||
SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool;
|
||||
pQPool->name = "vnode-query";
|
||||
pQPool->min = tsNumOfVnodeQueryThreads;
|
||||
|
@ -505,8 +544,13 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
|
||||
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
|
||||
|
||||
SAutoQWorkerPool *pLongExecPool = &pMgmt->streamLongExecPool;
|
||||
pLongExecPool->name = "vnode-stream-long-exec";
|
||||
pLongExecPool->ratio = tsRatioOfVnodeStreamThreads/3;
|
||||
if ((code = tAutoQWorkerInit(pLongExecPool)) != 0) return code;
|
||||
|
||||
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
|
||||
pStreamCtrlPool->name = "vnode-ctrl-stream";
|
||||
pStreamCtrlPool->name = "vnode-stream-ctrl";
|
||||
pStreamCtrlPool->max = 1;
|
||||
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
|
||||
|
||||
|
@ -541,6 +585,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
void vmStopWorker(SVnodeMgmt *pMgmt) {
|
||||
tQueryAutoQWorkerCleanup(&pMgmt->queryPool);
|
||||
tAutoQWorkerCleanup(&pMgmt->streamPool);
|
||||
tAutoQWorkerCleanup(&pMgmt->streamLongExecPool);
|
||||
tWWorkerCleanup(&pMgmt->streamCtrlPool);
|
||||
tWWorkerCleanup(&pMgmt->fetchPool);
|
||||
dDebug("vnode workers are closed");
|
||||
|
|
|
@ -43,6 +43,7 @@ void initStateStoreAPI(SStateStore* pStore) {
|
|||
pStore->streamStateCheck = streamStateCheck;
|
||||
pStore->streamStateGetByPos = streamStateGetByPos;
|
||||
pStore->streamStateDel = streamStateDel;
|
||||
pStore->streamStateDelByGroupId = streamStateDelByGroupId;
|
||||
pStore->streamStateClear = streamStateClear;
|
||||
pStore->streamStateSaveInfo = streamStateSaveInfo;
|
||||
pStore->streamStateGetInfo = streamStateGetInfo;
|
||||
|
|
|
@ -113,6 +113,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
|||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
|
||||
|
|
|
@ -1302,7 +1302,7 @@ _checkpoint:
|
|||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
||||
if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
|
||||
streamMetaWUnLock(pMeta);
|
||||
taosHashCancelIterate(pInfoHash, infoHash);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -268,13 +268,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
// stream do update the nodeEp info, write it into stream meta.
|
||||
if (updated) {
|
||||
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
|
||||
code = streamMetaSaveTask(pMeta, pTask);
|
||||
code = streamMetaSaveTaskInMeta(pMeta, pTask);
|
||||
if (code) {
|
||||
tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
if (pHTask != NULL) {
|
||||
code = streamMetaSaveTask(pMeta, pHTask);
|
||||
code = streamMetaSaveTaskInMeta(pMeta, pHTask);
|
||||
if (code) {
|
||||
tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
|
||||
}
|
||||
|
@ -751,6 +751,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
|||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
|
||||
|
||||
return 0; // always return success
|
||||
}
|
||||
|
||||
|
@ -865,6 +867,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
|
||||
code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||
SStreamTask* pTask = NULL;
|
||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||
|
|
|
@ -166,6 +166,7 @@ void initStateStoreAPI(SStateStore* pStore) {
|
|||
pStore->streamStateCheck = streamStateCheck;
|
||||
pStore->streamStateGetByPos = streamStateGetByPos;
|
||||
pStore->streamStateDel = streamStateDel;
|
||||
pStore->streamStateDelByGroupId = streamStateDelByGroupId;
|
||||
pStore->streamStateClear = streamStateClear;
|
||||
pStore->streamStateSaveInfo = streamStateSaveInfo;
|
||||
pStore->streamStateGetInfo = streamStateGetInfo;
|
||||
|
|
|
@ -934,9 +934,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
|
||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
if (!syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
}
|
||||
|
@ -948,8 +946,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_RSP:
|
||||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_GET_STREAM_PROGRESS:
|
||||
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
|
||||
default:
|
||||
|
@ -996,6 +992,22 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn
|
|||
}
|
||||
}
|
||||
|
||||
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in stream long exec queue is processing", pVnode->config.vgId, pMsg);
|
||||
if (!syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
}
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream long exec queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||
if (code) {
|
||||
|
|
|
@ -995,20 +995,34 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
|
||||
int64_t st = taosGetTimestampMs();
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
if (pTaskInfo == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
|
||||
if (waitDuration > 0) {
|
||||
qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0);
|
||||
} else {
|
||||
qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
|
||||
}
|
||||
|
||||
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
|
||||
|
||||
if (waitDuration > 0) {
|
||||
while (1) {
|
||||
taosWLockLatch(&pTaskInfo->lock);
|
||||
if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
|
||||
taosWUnLockLatch(&pTaskInfo->lock);
|
||||
taosMsleep(100);
|
||||
|
||||
taosMsleep(200);
|
||||
|
||||
int64_t d = taosGetTimestampMs() - st;
|
||||
if (d >= waitDuration && waitDuration >= 0) {
|
||||
qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else { // not running now
|
||||
pTaskInfo->code = rspCode;
|
||||
taosWUnLockLatch(&pTaskInfo->lock);
|
||||
|
@ -1017,6 +1031,9 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
|
|||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
if (NULL == pTaskInfo) {
|
||||
|
|
|
@ -42,9 +42,7 @@ typedef struct SIndefOperatorInfo {
|
|||
} SIndefOperatorInfo;
|
||||
|
||||
static int32_t doGenerateSourceData(SOperatorInfo* pOperator);
|
||||
static SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator);
|
||||
static int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||
static SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator);
|
||||
static int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
|
||||
static int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
|
||||
static int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
|
||||
|
@ -557,12 +555,6 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
|
|||
}
|
||||
}
|
||||
|
||||
SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pResBlock = NULL;
|
||||
pOperator->pTaskInfo->code = doApplyIndefinitFunction(pOperator, &pResBlock);
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||
|
|
|
@ -3818,7 +3818,11 @@ FETCH_NEXT_BLOCK:
|
|||
int32_t deleteNum = 0;
|
||||
code = deletePartName(pInfo, pBlock, &deleteNum);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (deleteNum == 0) goto FETCH_NEXT_BLOCK;
|
||||
if (deleteNum == 0) {
|
||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo));
|
||||
qDebug("===stream=== ignore block type 18, delete num is 0");
|
||||
goto FETCH_NEXT_BLOCK;
|
||||
}
|
||||
} break;
|
||||
case STREAM_CHECKPOINT: {
|
||||
qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");
|
||||
|
|
|
@ -232,6 +232,29 @@ static void doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
|
|||
|
||||
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
|
||||
|
||||
static void doDeleteWindowByGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
uint64_t* pGroupIdData = (uint64_t*)pGpIdCol->pData;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
uint64_t groupId = pGroupIdData[i];
|
||||
void* pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
|
||||
size_t keyLen = 0;
|
||||
SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
|
||||
if (pKey->groupId == groupId) {
|
||||
int32_t tmpRes = tSimpleHashIterateRemove(pInfo->aggSup.pResultRowHashTable, pKey, keyLen, &pIte, &iter);
|
||||
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
|
||||
}
|
||||
}
|
||||
|
||||
pAPI->stateStore.streamStateDelByGroupId(pInfo->pState, groupId);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
|
||||
SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -5443,7 +5466,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
code = getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_DROP_CHILD_TABLE) {
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
(*ppRes) = pBlock;
|
||||
return code;
|
||||
} else if (pBlock->info.type == STREAM_DROP_CHILD_TABLE) {
|
||||
doDeleteWindowByGroupId(pOperator, pBlock);
|
||||
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
(*ppRes) = pBlock;
|
||||
return code;
|
||||
|
|
|
@ -697,7 +697,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
|
||||
pTask->status.taskStatus = TASK_STATUS__READY;
|
||||
|
||||
code = streamMetaSaveTask(pMeta, pTask);
|
||||
code = streamMetaSaveTaskInMeta(pMeta, pTask);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -875,7 +875,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore
|
||||
if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore
|
||||
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
|
||||
streamTaskSetIdleInfo(pTask, 500);
|
||||
return code;
|
||||
|
|
|
@ -633,7 +633,7 @@ void streamMetaCloseImpl(void* arg) {
|
|||
}
|
||||
|
||||
// todo let's check the status for each task
|
||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
void* buf = NULL;
|
||||
int32_t len;
|
||||
|
@ -683,7 +683,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
|
||||
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
|
||||
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
||||
int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
|
||||
if (code != 0) {
|
||||
|
@ -706,7 +706,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
|
||||
if (p != NULL) {
|
||||
stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId);
|
||||
stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
|
||||
tFreeStreamTask(pTask);
|
||||
return code;
|
||||
}
|
||||
|
@ -736,7 +736,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
return code;
|
||||
}
|
||||
|
||||
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
||||
if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
|
||||
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||
|
||||
|
@ -886,6 +886,8 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
|
|||
|
||||
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||
int32_t code = 0;
|
||||
int32_t waitingDuration = 5000;
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||
if (code) {
|
||||
|
@ -896,7 +898,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
|||
|
||||
// let's kill the query procedure within stream, to end it ASAP.
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
@ -933,7 +935,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
|
||||
code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
||||
code = streamMetaRemoveTask(pMeta, &id);
|
||||
code = streamMetaRemoveTaskInMeta(pMeta, &id);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
|
||||
}
|
||||
|
@ -964,6 +966,32 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t numOfTasks = 0;
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
// code = streamMetaUnregisterTask(pMeta, streamId, taskId);
|
||||
// numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
// if (code) {
|
||||
// stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
|
||||
// }
|
||||
//
|
||||
// code = streamMetaCommit(pMeta);
|
||||
// if (code) {
|
||||
// stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
|
||||
// } else {
|
||||
// stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
|
||||
// }
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||
streamMetaWLock(pMeta);
|
||||
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||
|
@ -1187,7 +1215,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (taosArrayGetSize(pRecycleList) > 0) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
|
||||
STaskId* pId = taosArrayGet(pRecycleList, i);
|
||||
code = streamMetaRemoveTask(pMeta, pId);
|
||||
code = streamMetaRemoveTaskInMeta(pMeta, pId);
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
|||
memcpy(serializedReq, &req, len);
|
||||
|
||||
SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
|
||||
return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg);
|
||||
return tmsgPutToQueue(pTask->pMsgCb, STREAM_LONG_EXEC_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
|
|
|
@ -447,7 +447,6 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
|||
continue;
|
||||
}
|
||||
|
||||
int64_t refId = pTask->id.refId;
|
||||
int32_t ret = streamTaskStop(pTask);
|
||||
if (ret) {
|
||||
stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
|
||||
|
|
|
@ -224,6 +224,10 @@ void streamStateDel(SStreamState* pState, const SWinKey* key) {
|
|||
deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
|
||||
}
|
||||
|
||||
void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId) {
|
||||
deleteRowBuffByGroupId(pState->pFileState, groupId);
|
||||
}
|
||||
|
||||
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
||||
return streamStateFillPut_rocksdb(pState, key, value, vLen);
|
||||
}
|
||||
|
|
|
@ -710,7 +710,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
|
||||
}
|
||||
|
@ -869,7 +869,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
|
|||
pStreamTask->status.taskStatus = TASK_STATUS__READY;
|
||||
}
|
||||
|
||||
code = streamMetaSaveTask(pMeta, pStreamTask);
|
||||
code = streamMetaSaveTaskInMeta(pMeta, pStreamTask);
|
||||
streamMutexUnlock(&(pStreamTask->lock));
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
|
@ -1034,7 +1034,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
|||
// in case of fill-history task, stop the tsdb file scan operation.
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
|
||||
code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000);
|
||||
}
|
||||
|
||||
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
|
||||
|
@ -1296,6 +1296,8 @@ const char* streamTaskGetExecType(int32_t type) {
|
|||
return "resume-task-from-idle";
|
||||
case STREAM_EXEC_T_ADD_FAILED_TASK:
|
||||
return "record-start-failed-task";
|
||||
case STREAM_EXEC_T_STOP_ONE_TASK:
|
||||
return "stop-one-task";
|
||||
case 0:
|
||||
return "exec-all-tasks";
|
||||
default:
|
||||
|
|
|
@ -667,6 +667,32 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe
|
|||
}
|
||||
}
|
||||
|
||||
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
|
||||
SSHashObj* pRowMap = pFileState->rowStateBuff;
|
||||
void* pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
|
||||
size_t keyLen = 0;
|
||||
SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
|
||||
if (pKey->groupId == groupId) {
|
||||
int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
|
||||
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
|
||||
}
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
|
||||
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
|
||||
SWinKey delKey = {.groupId = groupId};
|
||||
int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
|
||||
qTrace("%s at line %d res:%d", __func__, __LINE__, code);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -256,7 +256,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
|
||||
int32_t code;
|
||||
STaosQueue *queue;
|
||||
|
||||
|
@ -280,7 +280,10 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
|||
int32_t queueNum = taosGetQueueNumber(pool->qset);
|
||||
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
|
||||
int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
|
||||
if (dstWorkerNum < 2) dstWorkerNum = 2;
|
||||
|
||||
if (dstWorkerNum < minNum) {
|
||||
dstWorkerNum = minNum;
|
||||
}
|
||||
|
||||
// spawn a thread to process queue
|
||||
while (curWorkerNum < dstWorkerNum) {
|
||||
|
|
|
@ -79,7 +79,7 @@
|
|||
(void)streamMetaAddFailedTask
|
||||
(void)streamMetaAddTaskLaunchResult
|
||||
(void)streamMetaCommit
|
||||
(void)streamMetaRemoveTask
|
||||
(void)streamMetaRemoveTaskInMeta
|
||||
(void)streamMetaSendHbHelper
|
||||
(void)streamMetaStartAllTasks
|
||||
(void)streamMetaStartOneTask
|
||||
|
|
Loading…
Reference in New Issue