refactor stream session window

This commit is contained in:
liuyao 2023-08-14 17:51:20 +08:00
parent 4b435e2cbf
commit d1c289823f
5 changed files with 149 additions and 162 deletions

View File

@ -183,9 +183,10 @@ void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode);
int32_t resultrowComparAsc(const void* p1, const void* p2);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI);
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI);
char* getStreamOpName(uint16_t opType);
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);

View File

@ -2178,6 +2178,45 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_SUCCESS;
}
char* getStreamOpName(uint16_t opType) {
switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: {
return "stream scan";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: {
return "interval single";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: {
return "interval final";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: {
return "interval semi";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: {
return "stream fill";
}
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: {
return "session single";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: {
return "session semi";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: {
return "session final";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
return "state single";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
return "stream partitionby";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: {
return "stream event";
};
}
return "";
}
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
@ -2188,6 +2227,20 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
taosMemoryFree(pBuf);
}
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
return;
}
if (qDebugFlag & DEBUG_DEBUG) {
char* pBuf = NULL;
char flagBuf[64];
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
}
}
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {

View File

@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
(pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
}
if (pOperator->status == OP_RES_TO_RETURN) {
doDeleteFillFinalize(pOperator);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
setOperatorCompleted(pOperator);
@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
pInfo->pFillInfo->preRowKey = INT64_MIN;
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
break;
}
printDataBlock(pBlock, "stream fill recv", GET_TASKID(pTaskInfo));
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo->pFillSup->hasDelete = true;
doDeleteFillResult(pOperator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "stream fill delete", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
continue;
@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}

View File

@ -995,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pDest->info.rows;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
ASSERT(pDest->info.rows > 0);
printDataBlock(pDest, "stream partitionby", GET_TASKID(pTaskInfo));
printDataBlock(pDest, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pDest;
}
@ -1116,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
return NULL;
}
printDataBlock(pBlock, "stream partitionby recv", GET_TASKID(pTaskInfo));
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_PULL_DATA:
@ -1126,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case STREAM_DELETE_DATA: {
copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pInfo->pDelRes, "stream partitionby delete", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
} break;
default:

View File

@ -913,18 +913,7 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) {
pPos->beUsed = true;
}
}
static char* getStreamOpName(uint16_t opType) {
switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "interval single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
return "interval final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return "interval semi";
default:
return "";
}
}
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
@ -1030,14 +1019,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", pInfo->numOfDatapack);
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
break;
}
pInfo->numOfDatapack++;
printDataBlock(pBlock, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final recv" : "interval semi recv",
GET_TASKID(pTaskInfo));
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
@ -1061,8 +1049,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi",
GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_CLEAR) {
pInfo->pDelRes->info.type = STREAM_CLEAR;
} else {
@ -1260,20 +1247,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->numOfChild = numOfChild;
pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
pOperator->name = "StreamFinalIntervalOperator";
} else {
// semi interval operator does not catch result
pOperator->name = "StreamSemiIntervalOperator";
}
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
pInfo->pullIndex = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -1299,6 +1274,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->recvGetAll = false;
pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
pOperator->name = getStreamOpName(pOperator->operatorType);
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
@ -1984,6 +1963,25 @@ void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo*
cleanupGroupResInfo(pGroupResInfo);
}
}
static SSDataBlock* buildSessionResult(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pBInfo->pRes;
}
return NULL;
}
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
int32_t size = taosArrayGetSize(pAllWins);
if (size == 0) {
@ -2011,23 +2009,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
qDebug("===stream=== stream session agg");
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session",
GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
SSDataBlock* opRes = buildSessionResult(pOperator);
if (opRes) {
return opRes;
}
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session",
GET_TASKID(pTaskInfo));
return pBInfo->pRes;
}
setOperatorCompleted(pOperator);
return NULL;
}
@ -2045,8 +2034,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) {
break;
}
printDataBlock(pBlock, IS_FINAL_SESSION_OP(pOperator) ? "final session recv" : "single session recv",
GET_TASKID(pTaskInfo));
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
@ -2117,24 +2105,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
#if 0
char* pBuf = streamStateSessionDump(pAggSup->pState);
qDebug("===stream===final session%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session",
GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session",
GET_TASKID(pTaskInfo));
return pBInfo->pRes;
SSDataBlock* opRes = buildSessionResult(pOperator);
if (opRes) {
return opRes;
}
setOperatorCompleted(pOperator);
@ -2276,8 +2249,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory;
}
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL);
@ -2312,22 +2285,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
qDebug("===stream=== stream session semi agg");
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
{
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "semi session", GET_TASKID(pTaskInfo));
return pBInfo->pRes;
}
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "semi session delete", GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
SSDataBlock* opRes = buildSessionResult(pOperator);
if (opRes) {
return opRes;
}
if (pOperator->status == OP_RES_TO_RETURN) {
@ -2354,7 +2320,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
break;
}
printDataBlock(pBlock, "semi session recv", GET_TASKID(pTaskInfo));
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
@ -2395,22 +2361,9 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
#if 0
char* pBuf = streamStateSessionDump(pAggSup->pState);
qDebug("===stream===semi session%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "semi session", GET_TASKID(pTaskInfo));
return pBInfo->pRes;
}
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "semi session delete", GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
SSDataBlock* opRes = buildSessionResult(pOperator);
if (opRes) {
return opRes;
}
clearFunctionContext(&pOperator->exprSupp);
@ -2432,7 +2385,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pOperator->operatorType = pPhyNode->type;
char* name = getStreamOpName(pOperator->operatorType);
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
@ -2440,8 +2392,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
}
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
@ -2697,6 +2648,25 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
}
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pBInfo->pRes;
}
return NULL;
}
static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@ -2708,16 +2678,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
qDebug("===stream=== stream state agg");
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete", GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "single state", GET_TASKID(pTaskInfo));
return pBInfo->pRes;
SSDataBlock* resBlock = buildStateResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
setOperatorCompleted(pOperator);
@ -2782,22 +2745,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
#if 0
char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
qDebug("===stream===final session%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete", GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "single state", GET_TASKID(pTaskInfo));
return pBInfo->pRes;
SSDataBlock* resBlock = buildStateResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
setOperatorCompleted(pOperator);
return NULL;
@ -3015,24 +2965,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExprSupp* pSup = &pOperator->exprSupp;
SExprSupp* pSup = &pOperator->exprSupp;
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single interval delete", GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows > 0) {
printDataBlock(pInfo->binfo.pRes, "single interval", GET_TASKID(pTaskInfo));
return pInfo->binfo.pRes;
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->recvGetAll) {
@ -3065,25 +3009,26 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
break;
}
pInfo->numOfDatapack++;
printDataBlock(pBlock, "single interval recv", GET_TASKID(pTaskInfo));
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
qDebug("===stream===single interval recv|block type STREAM_GET_ALL");
qDebug("===stream===%s recv|block type STREAM_GET_ALL", getStreamOpName(pOperator->operatorType));
pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
printDataBlock(pBlock, "single interval", GET_TASKID(pTaskInfo));
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pBlock;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
@ -3129,19 +3074,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single interval delete", GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows > 0) {
printDataBlock(pInfo->binfo.pRes, "single interval", GET_TASKID(pTaskInfo));
return pInfo->binfo.pRes;
}
return NULL;
return buildIntervalResult(pOperator);
}
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,