Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
65b1531aca
|
@ -56,7 +56,6 @@ extern "C" {
|
|||
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
|
||||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||
#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7)
|
||||
|
||||
typedef struct SStreamTask SStreamTask;
|
||||
typedef struct SStreamQueue SStreamQueue;
|
||||
|
@ -783,7 +782,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
|
|||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock);
|
||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt);
|
||||
|
||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
|
|||
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||
int32_t tsNumOfMnodeReadThreads = 1;
|
||||
int32_t tsNumOfVnodeQueryThreads = 4;
|
||||
float tsRatioOfVnodeStreamThreads = 1.5F;
|
||||
float tsRatioOfVnodeStreamThreads = 0.5F;
|
||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||
int32_t tsNumOfQnodeQueryThreads = 4;
|
||||
|
|
|
@ -142,8 +142,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
||||
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
||||
vgId, req.taskId);
|
||||
tqError(
|
||||
"vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
|
||||
"stream task:0x%x",
|
||||
vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
|
||||
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
||||
} else {
|
||||
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||
|
@ -612,23 +614,35 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
|||
|
||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
|
||||
int32_t vgId = pMeta->vgId;
|
||||
STaskId hTaskId = {0};
|
||||
|
||||
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
if (pTask != NULL) {
|
||||
// drop the related fill-history task firstly
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
|
||||
streamMetaAcquireOneTask(*ppTask);
|
||||
SStreamTask* pTask = *ppTask;
|
||||
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
||||
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
|
||||
tqDebug("s-task:0x%x vgId:%d drop fill-history task:0x%x firstly", pReq->taskId, vgId,
|
||||
(int32_t)pHTaskId->taskId);
|
||||
hTaskId.streamId = pTask->hTaskInfo.id.streamId;
|
||||
hTaskId.taskId = pTask->hTaskInfo.id.taskId;
|
||||
}
|
||||
|
||||
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// drop the related fill-history task firstly
|
||||
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
||||
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
|
||||
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
|
||||
}
|
||||
|
||||
// drop the stream task now
|
||||
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
|
|
|
@ -4009,6 +4009,26 @@ static int32_t translateEventWindow(STranslateContext* pCxt, SSelectStmt* pSelec
|
|||
}
|
||||
|
||||
static int32_t translateCountWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
SCountWindowNode* pCountWin = (SCountWindowNode*)pSelect->pWindow;
|
||||
if (pCountWin->windowCount <= 1) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Size of Count window must exceed 1.");
|
||||
}
|
||||
|
||||
if (pCountWin->windowSliding <= 0) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Size of Count window must exceed 0.");
|
||||
}
|
||||
|
||||
if (pCountWin->windowSliding > pCountWin->windowCount) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"sliding value no larger than the count value.");
|
||||
}
|
||||
|
||||
if (pCountWin->windowCount > INT32_MAX) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Size of Count window must less than 2147483647(INT32_MAX).");
|
||||
}
|
||||
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
|
||||
!isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY,
|
||||
|
@ -7829,28 +7849,6 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
|
|||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Ignore expired data of Count window must be 1.");
|
||||
}
|
||||
|
||||
SCountWindowNode* pCountWin = (SCountWindowNode*)pSelect->pWindow;
|
||||
if (pCountWin->windowCount <= 1) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Size of Count window must exceed 1.");
|
||||
}
|
||||
|
||||
if (pCountWin->windowSliding <= 0) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Size of Count window must exceed 0.");
|
||||
}
|
||||
|
||||
if (pCountWin->windowSliding > pCountWin->windowCount) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"sliding value no larger than the count value.");
|
||||
}
|
||||
|
||||
if (pCountWin->windowCount > INT32_MAX) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Size of Count window must less than 2147483647(INT32_MAX).");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -274,7 +274,7 @@ static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
|
|||
}
|
||||
}
|
||||
|
||||
if (WINDOW_TYPE_STATE == pWindow->winType) {
|
||||
if (WINDOW_TYPE_STATE == pWindow->winType || WINDOW_TYPE_COUNT == pWindow->winType) {
|
||||
if (!streamQuery) {
|
||||
return stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
} else {
|
||||
|
|
|
@ -99,6 +99,7 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
|
|||
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
||||
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
||||
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
||||
void clearBufferedDispatchMsg(SStreamTask* pTask);
|
||||
|
||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
||||
|
|
|
@ -315,6 +315,16 @@ int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
|
|||
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
}
|
||||
|
||||
void clearBufferedDispatchMsg(SStreamTask* pTask) {
|
||||
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||
if (pMsgInfo->pData != NULL) {
|
||||
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask));
|
||||
}
|
||||
|
||||
pMsgInfo->pData = NULL;
|
||||
pMsgInfo->dispatchMsgType = 0;
|
||||
}
|
||||
|
||||
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||
|
@ -678,8 +688,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
// todo deal with only partially success dispatch case
|
||||
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
||||
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||
pTask->msgInfo.pData = NULL;
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -740,6 +749,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
|||
|
||||
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
|
||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||
ASSERT(dataStrLen > 0);
|
||||
|
||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||
if (buf == NULL) return -1;
|
||||
|
||||
|
@ -936,15 +947,12 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
|
|||
// this message has been sent successfully, let's try next one.
|
||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||
|
||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||
if (delayDispatch) {
|
||||
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
||||
}
|
||||
|
||||
pTask->msgInfo.pData = NULL;
|
||||
pTask->msgInfo.dispatchMsgType = 0;
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
|
||||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
||||
|
||||
|
@ -1084,7 +1092,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
} else { // this message has been sent successfully, let's try next one.
|
||||
pTask->msgInfo.retryCount = 0;
|
||||
|
||||
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
||||
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
|
||||
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
|
||||
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
|
||||
ASSERT(pTask->info.fillHistory == 1);
|
||||
|
@ -1093,6 +1101,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
|
||||
}
|
||||
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
|
||||
// now ready for next data output
|
||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
} else {
|
||||
|
|
|
@ -412,9 +412,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
||||
|
||||
if (pTask->msgInfo.pData != NULL) {
|
||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||
pTask->msgInfo.pData = NULL;
|
||||
pTask->msgInfo.dispatchMsgType = 0;
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
|
@ -764,21 +762,13 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
|
|||
return status;
|
||||
}
|
||||
|
||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) {
|
||||
if (pTask == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (metaLock) {
|
||||
streamMetaWLock(pMeta);
|
||||
}
|
||||
|
||||
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
||||
if (ppStreamTask != NULL) {
|
||||
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
|
||||
|
@ -796,10 +786,6 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool
|
|||
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
|
||||
}
|
||||
|
||||
if (metaLock) {
|
||||
streamMetaWUnLock(pMeta);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -79,5 +79,27 @@ if $data22 != 4 then
|
|||
goto loop3
|
||||
endi
|
||||
|
||||
|
||||
print step2
|
||||
print =============== create database
|
||||
sql create database test1 vgroups 1;
|
||||
sql use test1;
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
#2~INT32_MAX
|
||||
sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(-1);
|
||||
sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(0);
|
||||
sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(1);
|
||||
sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2147483648);
|
||||
sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(10, 0);
|
||||
sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(10, -1);
|
||||
sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(10, 11);
|
||||
|
||||
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2);
|
||||
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2147483647);
|
||||
|
||||
print query_count0 end
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue