refactor: do some internal refactor.
This commit is contained in:
parent
dbd79cc80c
commit
b02b3117b5
|
@ -261,7 +261,7 @@ bool tqNextBlockImpl(STqReader *pReader);
|
||||||
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
|
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
|
||||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
|
int32_t tqRetrieveDataBlock(STqReader *pReader, const char* idstr);
|
||||||
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
||||||
|
|
||||||
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -503,13 +503,11 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
|
int32_t tqRetrieveDataBlock(STqReader* pReader, const char* idstr) {
|
||||||
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
|
tqDebug("tq reader retrieve data block %p, index:%d, total:%d, %s", pReader->msg.msgStr, pReader->nextBlk,
|
||||||
|
(int32_t)taosArrayGetSize(pReader->submit.aSubmitTbData), idstr);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
||||||
if (pSubmitTbDataRet) {
|
|
||||||
*pSubmitTbDataRet = pSubmitTbData;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->pResBlock;
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
|
@ -672,8 +670,6 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
|
||||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
|
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
|
||||||
while (1) {
|
while (1) {
|
||||||
SColVal colVal;
|
SColVal colVal;
|
||||||
tqDebug("start to extract column id:%d, index:%d", pColData->info.colId, sourceIdx);
|
|
||||||
|
|
||||||
tRowGet(pRow, pTSchema, sourceIdx, &colVal);
|
tRowGet(pRow, pTSchema, sourceIdx, &colVal);
|
||||||
if (colVal.cid < pColData->info.colId) {
|
if (colVal.cid < pColData->info.colId) {
|
||||||
tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d",
|
tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d",
|
||||||
|
|
|
@ -2078,7 +2078,7 @@ FETCH_NEXT_BLOCK:
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
while (tqNextBlockImpl(pInfo->tqReader)) {
|
while (tqNextBlockImpl(pInfo->tqReader)) {
|
||||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
|
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -272,7 +272,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
|
||||||
times++;
|
times++;
|
||||||
taosMsleep(1);
|
taosMsleep(1);
|
||||||
qDebug("===stream===try agian batchSize:%d", batchSize);
|
qDebug("===stream===try again batchSize:%d", batchSize);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue