Merge pull request #15003 from taosdata/feature/TD-17357
feat(stream): add log
This commit is contained in:
commit
e59942bdc7
|
@ -1752,7 +1752,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
||||||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += snprintf(dumpBuf + len, size - len, "%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|======\n", "dumpBlockData",
|
len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|\n", flag,
|
||||||
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
||||||
pDataBlock->info.uid);
|
pDataBlock->info.uid);
|
||||||
if (len >= size - 1) return dumpBuf;
|
if (len >= size - 1) return dumpBuf;
|
||||||
|
|
|
@ -1353,13 +1353,13 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
||||||
if (chIds && pPullDataMap) {
|
if (chIds && pPullDataMap) {
|
||||||
SArray* chAy = *(SArray**)chIds;
|
SArray* chAy = *(SArray**)chIds;
|
||||||
int32_t size = taosArrayGetSize(chAy);
|
int32_t size = taosArrayGetSize(chAy);
|
||||||
qDebug("window %" PRId64 " wait child size:%d", win.skey, size);
|
qDebug("===stream===window %" PRId64 " wait child size:%d", win.skey, size);
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
qDebug("window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
|
qDebug("===stream===window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
} else if (pPullDataMap) {
|
} else if (pPullDataMap) {
|
||||||
qDebug("close window %" PRId64, win.skey);
|
qDebug("===stream===close window %" PRId64, win.skey);
|
||||||
}
|
}
|
||||||
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
||||||
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
@ -2482,7 +2482,9 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
||||||
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
|
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
|
||||||
// add pull data request
|
// add pull data request
|
||||||
taosArrayPush(pInfo->pPullWins, &pull);
|
taosArrayPush(pInfo->pPullWins, &pull);
|
||||||
addPullWindow(pInfo->pPullDataMap, &winRes, taosArrayGetSize(pInfo->pChildren));
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
addPullWindow(pInfo->pPullDataMap, &winRes, size);
|
||||||
|
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
|
||||||
} else {
|
} else {
|
||||||
int32_t index = -1;
|
int32_t index = -1;
|
||||||
SArray* chArray = NULL;
|
SArray* chArray = NULL;
|
||||||
|
@ -2492,14 +2494,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
||||||
chId = getChildIndex(pSDataBlock);
|
chId = getChildIndex(pSDataBlock);
|
||||||
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||||
}
|
}
|
||||||
if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) {
|
// if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) {
|
||||||
qDebug("======delete child id %d", chId);
|
// qDebug("===stream===delete child id %d", chId);
|
||||||
taosArrayRemove(chArray, index);
|
// taosArrayRemove(chArray, index);
|
||||||
if (taosArrayGetSize(chArray) == 0) {
|
// if (taosArrayGetSize(chArray) == 0) {
|
||||||
// pull data is over
|
// // pull data is over
|
||||||
taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
|
// taosHashRemove(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
|
if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
|
||||||
ignore = false;
|
ignore = false;
|
||||||
}
|
}
|
||||||
|
@ -2623,6 +2625,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
|
||||||
SArray* chArray = *(SArray**)chIds;
|
SArray* chArray = *(SArray**)chIds;
|
||||||
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||||
if (index != -1) {
|
if (index != -1) {
|
||||||
|
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
|
||||||
taosArrayRemove(chArray, index);
|
taosArrayRemove(chArray, index);
|
||||||
if (taosArrayGetSize(chArray) == 0) {
|
if (taosArrayGetSize(chArray) == 0) {
|
||||||
// pull data is over
|
// pull data is over
|
||||||
|
@ -2641,7 +2644,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2657,18 +2660,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
} else {
|
} else {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
|
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
|
||||||
pInfo->returnUpdate = false;
|
pInfo->returnUpdate = false;
|
||||||
ASSERT(!IS_FINAL_OP(pInfo));
|
ASSERT(!IS_FINAL_OP(pInfo));
|
||||||
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
@ -2676,13 +2679,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pPullDataRes->info.rows != 0) {
|
if (pInfo->pPullDataRes->info.rows != 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
ASSERT(IS_FINAL_OP(pInfo));
|
ASSERT(IS_FINAL_OP(pInfo));
|
||||||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->pPullDataRes;
|
return pInfo->pPullDataRes;
|
||||||
}
|
}
|
||||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||||
if (pInfo->pDelRes->info.rows != 0) {
|
if (pInfo->pDelRes->info.rows != 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2693,10 +2696,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
clearSpecialDataBlock(pInfo->pUpdateRes);
|
clearSpecialDataBlock(pInfo->pUpdateRes);
|
||||||
removeDeleteResults(pUpdated, pInfo->pDelWins);
|
removeDeleteResults(pUpdated, pInfo->pDelWins);
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
|
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
|
||||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
|
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
|
||||||
|
@ -2771,6 +2774,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info;
|
SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info;
|
||||||
pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||||
|
qDebug("===stream===add child, id:%d", chIndex);
|
||||||
}
|
}
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
|
||||||
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
||||||
|
@ -2795,14 +2799,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
|
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
|
||||||
pInfo->returnUpdate = false;
|
pInfo->returnUpdate = false;
|
||||||
ASSERT(!IS_FINAL_OP(pInfo));
|
ASSERT(!IS_FINAL_OP(pInfo));
|
||||||
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
@ -2811,14 +2815,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pPullDataRes->info.rows != 0) {
|
if (pInfo->pPullDataRes->info.rows != 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
ASSERT(IS_FINAL_OP(pInfo));
|
ASSERT(IS_FINAL_OP(pInfo));
|
||||||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->pPullDataRes;
|
return pInfo->pPullDataRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||||
if (pInfo->pDelRes->info.rows != 0) {
|
if (pInfo->pDelRes->info.rows != 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
// ASSERT(false);
|
// ASSERT(false);
|
||||||
|
|
Loading…
Reference in New Issue