refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-09-27 17:19:11 +08:00
parent df8cfa65c2
commit e134ed1c99
2 changed files with 5 additions and 6 deletions

View File

@ -829,7 +829,8 @@ static int32_t getPointInfoFromStateRight(SStreamAggSupporter* pAggSup, SStreamF
&curVLen, pWinCode); &curVLen, pWinCode);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
qDebug("===stream=== set stream interp next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", pNextPoint->key.ts, pNextPoint->key.groupId, pWinCode); qDebug("===stream=== set stream interp next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d",
pNextPoint->key.ts, pNextPoint->key.groupId, *pWinCode);
setPointBuff(pNextPoint, pFillSup); setPointBuff(pNextPoint, pFillSup);
@ -1623,8 +1624,8 @@ int32_t getSliceMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
for (int32_t i = size - 2; i >= 0; i--) { for (int32_t i = size - 2; i >= 0; i--) {
pKey = taosArrayGet(pAllWins, i); pKey = taosArrayGet(pAllWins, i);
if (preGpId != pKey->groupId) { if (preGpId != pKey->groupId) {
void* tmp = taosArrayPush(pMaxWins, pKey); void* p = taosArrayPush(pMaxWins, pKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); QUERY_CHECK_NULL(p, code, lino, _end, terrno);
preGpId = pKey->groupId; preGpId = pKey->groupId;
} }
} }

View File

@ -174,15 +174,13 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
continue; // checkpoint block not dispatch to downstream tasks continue; // checkpoint block not dispatch to downstream tasks
} }
SSDataBlock block = {0}; SSDataBlock block = {.info.childId = pTask->info.selfChildId};
code = assignOneDataBlock(&block, output); code = assignOneDataBlock(&block, output);
if (code) { if (code) {
stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr); stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
continue; continue;
} }
block.info.childId = pTask->info.selfChildId;
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1; numOfBlocks += 1;