add stream task id
This commit is contained in:
parent
8438309ad6
commit
9d1e5a512d
|
@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
|
|||
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
|
||||
|
||||
// for debug
|
||||
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
|
||||
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr);
|
||||
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
|
||||
tb_uid_t suid);
|
||||
|
|
|
@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
}
|
||||
|
||||
// for debug
|
||||
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
|
||||
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
|
||||
int32_t size = 2048*1024;
|
||||
*pDataBuf = taosMemoryCalloc(size, 1);
|
||||
char* dumpBuf = *pDataBuf;
|
||||
|
@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t len = 0;
|
||||
len += snprintf(dumpBuf + len, size - len,
|
||||
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
|
||||
"%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
|
||||
"|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
|
||||
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
|
||||
taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
|
||||
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
|
||||
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
|
|
@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
if (k == 0) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||
void* colData = colDataGetData(pColData, j);
|
||||
tqTrace("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
|
||||
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
|
||||
}
|
||||
if (IS_SET_NULL(pCol)) {
|
||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
||||
|
|
|
@ -185,7 +185,7 @@ int32_t convertFillType(int32_t mode);
|
|||
int32_t resultrowComparAsc(const void* p1, const void* p2);
|
||||
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI);
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
|
||||
|
||||
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
||||
|
|
|
@ -2178,12 +2178,12 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
qDebug("===stream===%s: Block is Null or Empty", flag);
|
||||
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
|
||||
return;
|
||||
}
|
||||
char* pBuf = NULL;
|
||||
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
|
||||
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf, taskIdStr));
|
||||
taosMemoryFree(pBuf);
|
||||
}
|
||||
|
|
|
@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
(pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
|
||||
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pRes, "stream fill");
|
||||
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pRes;
|
||||
}
|
||||
}
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doDeleteFillFinalize(pOperator);
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pRes, "stream fill");
|
||||
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pRes;
|
||||
}
|
||||
setOperatorCompleted(pOperator);
|
||||
|
@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
pInfo->pFillInfo->preRowKey = INT64_MIN;
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pRes, "stream fill");
|
||||
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pRes;
|
||||
}
|
||||
break;
|
||||
}
|
||||
printDataBlock(pBlock, "stream fill recv");
|
||||
printDataBlock(pBlock, "stream fill recv", GET_TASKID(pTaskInfo));
|
||||
|
||||
if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
|
||||
pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
|
||||
|
@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
pInfo->pFillSup->hasDelete = true;
|
||||
doDeleteFillResult(pOperator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "stream fill delete");
|
||||
printDataBlock(pInfo->pDelRes, "stream fill delete", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
continue;
|
||||
|
@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
|
||||
printDataBlock(pInfo->pRes, "stream fill");
|
||||
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
|
|
|
@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo
|
|||
static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; }
|
||||
|
||||
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pDest = pInfo->binfo.pRes;
|
||||
|
@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
|||
pOperator->resultInfo.totalRows += pDest->info.rows;
|
||||
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
|
||||
ASSERT(pDest->info.rows > 0);
|
||||
printDataBlock(pDest, "stream partitionby");
|
||||
printDataBlock(pDest, "stream partitionby", GET_TASKID(pTaskInfo));
|
||||
return pDest;
|
||||
}
|
||||
|
||||
|
@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
printDataBlock(pBlock, "stream partitionby recv");
|
||||
printDataBlock(pBlock, "stream partitionby recv", GET_TASKID(pTaskInfo));
|
||||
switch (pBlock->info.type) {
|
||||
case STREAM_NORMAL:
|
||||
case STREAM_PULL_DATA:
|
||||
|
@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
case STREAM_DELETE_DATA: {
|
||||
copyDataBlock(pInfo->pDelRes, pBlock);
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||
printDataBlock(pInfo->pDelRes, "stream partitionby delete");
|
||||
printDataBlock(pInfo->pDelRes, "stream partitionby delete", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
} break;
|
||||
default:
|
||||
|
|
|
@ -1343,7 +1343,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
if (rows == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
|
||||
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
|
@ -1360,7 +1360,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
TSKEY startTs = srcStartTsCol[0];
|
||||
TSKEY endTs = srcEndTsCol[0];
|
||||
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
|
||||
printDataBlock(pPreRes, "pre res");
|
||||
printDataBlock(pPreRes, "pre res", GET_TASKID(pTaskInfo));
|
||||
blockDataCleanup(pSrcBlock);
|
||||
int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1375,7 +1375,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
|
||||
&groupId, NULL);
|
||||
}
|
||||
printDataBlock(pSrcBlock, "new delete");
|
||||
printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo));
|
||||
}
|
||||
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
|
||||
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||
|
@ -1921,38 +1921,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
switch (pInfo->scanMode) {
|
||||
case STREAM_SCAN_FROM_RES: {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
||||
printDataBlock(pInfo->pRecoverRes, "scan recover", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pRecoverRes;
|
||||
} break;
|
||||
// case STREAM_SCAN_FROM_UPDATERES: {
|
||||
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
// printDataBlock(pInfo->pUpdateRes, "recover update");
|
||||
// return pInfo->pUpdateRes;
|
||||
// } break;
|
||||
// case STREAM_SCAN_FROM_DELETE_DATA: {
|
||||
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
// copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||
// pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||
// printDataBlock(pInfo->pDeleteDataRes, "recover delete");
|
||||
// return pInfo->pDeleteDataRes;
|
||||
// } break;
|
||||
// case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
||||
// SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
// if (pSDB) {
|
||||
// STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
// pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
// checkUpdateData(pInfo, true, pSDB, false);
|
||||
// printDataBlock(pSDB, "scan recover update");
|
||||
// calBlockTbName(pInfo, pSDB);
|
||||
// return pSDB;
|
||||
// }
|
||||
// blockDataCleanup(pInfo->pUpdateDataRes);
|
||||
// pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
// } break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -1971,12 +1942,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
|
||||
printDataBlock(pInfo->pCreateTbRes, "recover createTbl", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pCreateTbRes;
|
||||
}
|
||||
|
||||
qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
|
||||
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
||||
printDataBlock(pInfo->pRecoverRes, "scan recover", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pRecoverRes;
|
||||
}
|
||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||
|
@ -2032,7 +2003,7 @@ FETCH_NEXT_BLOCK:
|
|||
pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
|
||||
} break;
|
||||
case STREAM_DELETE_DATA: {
|
||||
printDataBlock(pBlock, "stream scan delete recv");
|
||||
printDataBlock(pBlock, "stream scan delete recv", GET_TASKID(pTaskInfo));
|
||||
SSDataBlock* pDelBlock = NULL;
|
||||
if (pInfo->tqReader) {
|
||||
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||
|
@ -2043,7 +2014,7 @@ FETCH_NEXT_BLOCK:
|
|||
|
||||
setBlockGroupIdByUid(pInfo, pDelBlock);
|
||||
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
|
||||
printDataBlock(pDelBlock, "stream scan delete recv filtered");
|
||||
printDataBlock(pDelBlock, "stream scan delete recv filtered", GET_TASKID(pTaskInfo));
|
||||
if (pDelBlock->info.rows == 0) {
|
||||
if (pInfo->tqReader) {
|
||||
blockDataDestroy(pDelBlock);
|
||||
|
@ -2054,7 +2025,7 @@ FETCH_NEXT_BLOCK:
|
|||
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
|
||||
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
|
||||
printDataBlock(pDelBlock, "stream scan delete result");
|
||||
printDataBlock(pDelBlock, "stream scan delete result", GET_TASKID(pTaskInfo));
|
||||
blockDataDestroy(pDelBlock);
|
||||
|
||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||
|
@ -2069,7 +2040,7 @@ FETCH_NEXT_BLOCK:
|
|||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||
printDataBlock(pDelBlock, "stream scan delete data");
|
||||
printDataBlock(pDelBlock, "stream scan delete data", GET_TASKID(pTaskInfo));
|
||||
if (pInfo->tqReader) {
|
||||
blockDataDestroy(pDelBlock);
|
||||
}
|
||||
|
@ -2120,7 +2091,7 @@ FETCH_NEXT_BLOCK:
|
|||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
printDataBlock(pSDB, "stream scan update");
|
||||
printDataBlock(pSDB, "stream scan update", GET_TASKID(pTaskInfo));
|
||||
calBlockTbName(pInfo, pSDB);
|
||||
return pSDB;
|
||||
}
|
||||
|
|
|
@ -2589,20 +2589,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
||||
if (pInfo->pPullDataRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pPullDataRes;
|
||||
}
|
||||
|
||||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
|
@ -2633,7 +2633,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
}
|
||||
|
@ -2670,7 +2670,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
pInfo->numOfDatapack++;
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv", GET_TASKID(pTaskInfo));
|
||||
|
||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||
|
@ -2694,7 +2694,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
|
||||
if (pBlock->info.type == STREAM_CLEAR) {
|
||||
pInfo->pDelRes->info.type = STREAM_CLEAR;
|
||||
} else {
|
||||
|
@ -2758,20 +2758,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
||||
if (pInfo->pPullDataRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pPullDataRes;
|
||||
}
|
||||
|
||||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
|
@ -3660,18 +3660,19 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
qDebug("===stream=== stream session agg");
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
}
|
||||
|
||||
|
@ -3692,7 +3693,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv", GET_TASKID(pTaskInfo));
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
|
@ -3770,13 +3771,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
}
|
||||
|
||||
|
@ -3950,6 +3951,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
TSKEY maxTs = INT64_MIN;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
qDebug("===stream=== stream session semi agg");
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -3959,13 +3961,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
{
|
||||
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, "semi session");
|
||||
printDataBlock(pBInfo->pRes, "semi session", GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
}
|
||||
|
||||
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "semi session delete");
|
||||
printDataBlock(pInfo->pDelRes, "semi session delete", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
|
@ -3993,7 +3995,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
break;
|
||||
}
|
||||
printDataBlock(pBlock, "semi session recv");
|
||||
printDataBlock(pBlock, "semi session recv", GET_TASKID(pTaskInfo));
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
|
@ -4042,13 +4044,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, "semi session");
|
||||
printDataBlock(pBInfo->pRes, "semi session", GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
}
|
||||
|
||||
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "semi session delete");
|
||||
printDataBlock(pInfo->pDelRes, "semi session delete", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
|
@ -4341,17 +4343,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
qDebug("===stream=== stream state agg");
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "single state delete");
|
||||
printDataBlock(pInfo->pDelRes, "single state delete", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, "single state");
|
||||
printDataBlock(pBInfo->pRes, "single state", GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
}
|
||||
|
||||
|
@ -4372,7 +4375,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
printDataBlock(pBlock, "single state recv");
|
||||
printDataBlock(pBlock, "single state recv", GET_TASKID(pTaskInfo));
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
|
@ -4425,13 +4428,13 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "single state delete");
|
||||
printDataBlock(pInfo->pDelRes, "single state delete", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, "single state");
|
||||
printDataBlock(pBInfo->pRes, "single state", GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
}
|
||||
setOperatorCompleted(pOperator);
|
||||
|
@ -5211,13 +5214,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "single interval delete");
|
||||
printDataBlock(pInfo->pDelRes, "single interval delete", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||
if (pInfo->binfo.pRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||
printDataBlock(pInfo->binfo.pRes, "single interval", GET_TASKID(pTaskInfo));
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
|
@ -5257,7 +5260,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pInfo->numOfDatapack++;
|
||||
printDataBlock(pBlock, "single interval recv");
|
||||
printDataBlock(pBlock, "single interval recv", GET_TASKID(pTaskInfo));
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
|
@ -5269,7 +5272,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
printDataBlock(pBlock, "single interval");
|
||||
printDataBlock(pBlock, "single interval", GET_TASKID(pTaskInfo));
|
||||
return pBlock;
|
||||
} else {
|
||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||
|
@ -5323,13 +5326,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, "single interval delete");
|
||||
printDataBlock(pInfo->pDelRes, "single interval delete", GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||
if (pInfo->binfo.pRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||
printDataBlock(pInfo->binfo.pRes, "single interval", GET_TASKID(pTaskInfo));
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue