refactor: add some logs.
This commit is contained in:
parent
f0f90888ab
commit
a76e47ef54
|
@ -504,7 +504,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqRetrieveDataBlock(STqReader* pReader, const char* idstr) {
|
int32_t tqRetrieveDataBlock(STqReader* pReader, const char* idstr) {
|
||||||
tqDebug("tq reader retrieve data block %p, index:%d, total:%d, %s", pReader->msg.msgStr, pReader->nextBlk,
|
tqDebug("tq reader retrieve data block %p, index:%d/%d, %s", pReader->msg.msgStr, pReader->nextBlk,
|
||||||
(int32_t)taosArrayGetSize(pReader->submit.aSubmitTbData), idstr);
|
(int32_t)taosArrayGetSize(pReader->submit.aSubmitTbData), idstr);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
||||||
|
|
|
@ -2046,17 +2046,18 @@ FETCH_NEXT_BLOCK:
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const char* id = GET_TASKID(pTaskInfo);
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
|
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
|
|
||||||
|
|
||||||
NEXT_SUBMIT_BLK:
|
NEXT_SUBMIT_BLK:
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pInfo->tqReader->msg.msgStr == NULL) {
|
if (pInfo->tqReader->msg.msgStr == NULL) {
|
||||||
if (pInfo->validBlockIndex >= totBlockNum) {
|
if (pInfo->validBlockIndex >= totalBlocks) {
|
||||||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
||||||
doClearBufferedBlocks(pInfo);
|
doClearBufferedBlocks(pInfo);
|
||||||
qDebug("stream scan return empty, consume block %d", totBlockNum);
|
|
||||||
|
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
// int32_t len = streamScanOperatorEncode(pInfo, &buff);
|
// int32_t len = streamScanOperatorEncode(pInfo, &buff);
|
||||||
// if (len > 0) {
|
// if (len > 0) {
|
||||||
|
@ -2068,17 +2069,18 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
int32_t current = pInfo->validBlockIndex++;
|
int32_t current = pInfo->validBlockIndex++;
|
||||||
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
|
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
|
||||||
|
|
||||||
|
qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
|
||||||
if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
|
if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
|
||||||
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
|
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
|
||||||
totBlockNum);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
while (tqNextBlockImpl(pInfo->tqReader, pTaskInfo->id.str)) {
|
while (tqNextBlockImpl(pInfo->tqReader, id)) {
|
||||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, pTaskInfo->id.str);
|
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id);
|
||||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2099,6 +2101,7 @@ FETCH_NEXT_BLOCK:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2110,7 +2113,7 @@ FETCH_NEXT_BLOCK:
|
||||||
pInfo->numOfExec++;
|
pInfo->numOfExec++;
|
||||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||||
|
|
||||||
qDebug("scan rows: %" PRId64", %s", pBlockInfo->rows, pTaskInfo->id.str);
|
qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pBlockInfo->rows > 0) {
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue