Merge pull request #21554 from taosdata/fix/resume_mem
refactor: do some internal refactor.
This commit is contained in:
commit
8c3770f5e4
|
@ -200,7 +200,7 @@ STqReader *tqReaderOpen(SVnode *pVnode);
|
||||||
void tqReaderClose(STqReader *);
|
void tqReaderClose(STqReader *);
|
||||||
|
|
||||||
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
||||||
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
|
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char* id);
|
||||||
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
|
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
|
||||||
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||||
|
|
||||||
|
|
|
@ -297,6 +297,7 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter);
|
||||||
// SDelFReader
|
// SDelFReader
|
||||||
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb);
|
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb);
|
||||||
int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
|
int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
|
||||||
|
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer);
|
||||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
|
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
|
||||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
|
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
|
||||||
// tsdbRead.c ==============================================================================================
|
// tsdbRead.c ==============================================================================================
|
||||||
|
|
|
@ -750,7 +750,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64,
|
tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64,
|
||||||
pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
|
pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
|
||||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
|
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1120,7 +1120,8 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s set the start wal offset to be:%" PRId64, pTask->id.idStr, sversion);
|
qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
|
||||||
|
|
||||||
walReaderSeekVer(pTask->exec.pWalReader, sversion);
|
walReaderSeekVer(pTask->exec.pWalReader, sversion);
|
||||||
pTask->chkInfo.currentVer = sversion;
|
pTask->chkInfo.currentVer = sversion;
|
||||||
|
|
||||||
|
|
|
@ -362,7 +362,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
}
|
}
|
||||||
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
||||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
|
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
}
|
}
|
||||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
|
||||||
|
|
|
@ -394,8 +394,8 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
|
|
||||||
if (pReader->tbIdHash == NULL) {
|
if (pReader->tbIdHash == NULL) {
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
|
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
|
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -457,7 +457,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||||
|
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||||
while (pReader->nextBlk < numOfBlocks) {
|
while (pReader->nextBlk < numOfBlocks) {
|
||||||
tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
|
tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
|
||||||
pReader->nextBlk, numOfBlocks, idstr);
|
pReader->nextBlk, numOfBlocks, idstr);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
|
@ -467,10 +467,11 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||||
|
|
||||||
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
|
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
|
||||||
if (ret != NULL) {
|
if (ret != NULL) {
|
||||||
tqDebug("tq reader block found, ver:%" PRId64 ", uid:%" PRId64, pReader->msg.ver, pSubmitTbData->uid);
|
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
|
tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
|
||||||
|
taosHashGetSize(pReader->tbIdHash), idstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->nextBlk++;
|
pReader->nextBlk++;
|
||||||
|
@ -604,7 +605,6 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol
|
||||||
|
|
||||||
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
|
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
|
||||||
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
|
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
||||||
|
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->pResBlock;
|
||||||
|
@ -612,6 +612,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
|
int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
|
||||||
int32_t sversion = pSubmitTbData->sver;
|
int32_t sversion = pSubmitTbData->sver;
|
||||||
int64_t suid = pSubmitTbData->suid;
|
int64_t suid = pSubmitTbData->suid;
|
||||||
int64_t uid = pSubmitTbData->uid;
|
int64_t uid = pSubmitTbData->uid;
|
||||||
|
@ -628,7 +629,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
|
||||||
if (pReader->pSchemaWrapper == NULL) {
|
if (pReader->pSchemaWrapper == NULL) {
|
||||||
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
|
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
|
||||||
"version %d, possibly dropped table",
|
"version %d, possibly dropped table",
|
||||||
pReader->pWalReader->pWal->cfg.vgId, suid, uid, pReader->cachedSchemaVer);
|
vgId, suid, uid, pReader->cachedSchemaVer);
|
||||||
pReader->cachedSchemaSuid = 0;
|
pReader->cachedSchemaSuid = 0;
|
||||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -642,6 +643,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
|
||||||
if (blockDataGetNumOfCols(pBlock) == 0) {
|
if (blockDataGetNumOfCols(pBlock) == 0) {
|
||||||
int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
|
int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqError("vgId:%d failed to build data block, code:%s", vgId, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -998,7 +1000,7 @@ FAIL:
|
||||||
|
|
||||||
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
|
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
|
||||||
|
|
||||||
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
|
||||||
if (pReader->tbIdHash) {
|
if (pReader->tbIdHash) {
|
||||||
taosHashClear(pReader->tbIdHash);
|
taosHashClear(pReader->tbIdHash);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1015,6 +1017,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||||
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t) taosArrayGetSize(tbUidList));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1089,7 +1092,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
taosArrayDestroy(list);
|
taosArrayDestroy(list);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list);
|
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
|
||||||
taosArrayDestroy(list);
|
taosArrayDestroy(list);
|
||||||
} else {
|
} else {
|
||||||
tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
|
tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
|
||||||
|
|
|
@ -1454,7 +1454,7 @@ static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelI
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pDelIdx) {
|
if (pDelIdx) {
|
||||||
code = tsdbReadDelData(pDelReader, pDelIdx, aDelData);
|
code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -266,7 +266,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
|
||||||
suid = pDelIdx->suid;
|
suid = pDelIdx->suid;
|
||||||
uid = pDelIdx->uid;
|
uid = pDelIdx->uid;
|
||||||
|
|
||||||
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
|
code = tsdbReadDelDatav1(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, INT64_MAX);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
taosArrayClear(pCommitter->aDelData);
|
taosArrayClear(pCommitter->aDelData);
|
||||||
|
|
|
@ -412,7 +412,7 @@ static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData);
|
code = tsdbReadDelDatav1(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData, INT64_MAX);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pIter->delInfo.suid = pDelIdx->suid;
|
pIter->delInfo.suid = pDelIdx->suid;
|
||||||
|
|
|
@ -2967,7 +2967,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
||||||
SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
|
SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
|
||||||
|
|
||||||
if (pIdx != NULL) {
|
if (pIdx != NULL) {
|
||||||
code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
|
code = tsdbReadDelDatav1(pReader->pDelFReader, pIdx, pDelData, pReader->verRange.maxVer);
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -2978,7 +2978,10 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
||||||
if (pMemTbData != NULL) {
|
if (pMemTbData != NULL) {
|
||||||
p = pMemTbData->pHead;
|
p = pMemTbData->pHead;
|
||||||
while (p) {
|
while (p) {
|
||||||
taosArrayPush(pDelData, p);
|
if (p->version <= pReader->verRange.maxVer) {
|
||||||
|
taosArrayPush(pDelData, p);
|
||||||
|
}
|
||||||
|
|
||||||
p = p->pNext;
|
p = p->pNext;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2986,7 +2989,9 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
||||||
if (piMemTbData != NULL) {
|
if (piMemTbData != NULL) {
|
||||||
p = piMemTbData->pHead;
|
p = piMemTbData->pHead;
|
||||||
while (p) {
|
while (p) {
|
||||||
taosArrayPush(pDelData, p);
|
if (p->version <= pReader->verRange.maxVer) {
|
||||||
|
taosArrayPush(pDelData, p);
|
||||||
|
}
|
||||||
p = p->pNext;
|
p = p->pNext;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4558,7 +4563,11 @@ int32_t tsdbReaderOpen(void* pVnode, SQueryTableDataCond* pCond, void* pTableLis
|
||||||
|
|
||||||
pReader->pIgnoreTables = pIgnoreTables;
|
pReader->pIgnoreTables = pIgnoreTables;
|
||||||
|
|
||||||
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64
|
||||||
|
" in this query %s",
|
||||||
|
pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->verRange.minVer,
|
||||||
|
pReader->verRange.maxVer, pReader->idStr);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
|
@ -1489,6 +1489,10 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
|
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
|
||||||
|
return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t offset = pDelIdx->offset;
|
int64_t offset = pDelIdx->offset;
|
||||||
int64_t size = pDelIdx->size;
|
int64_t size = pDelIdx->size;
|
||||||
|
@ -1510,11 +1514,15 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
|
||||||
SDelData delData;
|
SDelData delData;
|
||||||
n += tGetDelData(pReader->aBuf[0] + n, &delData);
|
n += tGetDelData(pReader->aBuf[0] + n, &delData);
|
||||||
|
|
||||||
if (taosArrayPush(aDelData, &delData) == NULL) {
|
if (delData.version > maxVer) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
continue;
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
|
if (taosArrayPush(aDelData, &delData) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(n == size);
|
ASSERT(n == size);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -1166,7 +1166,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID*
|
||||||
|
|
||||||
int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid);
|
int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid);
|
||||||
if (c < 0) {
|
if (c < 0) {
|
||||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData);
|
code = tsdbReadDelDatav1(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData, INT64_MAX);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1);
|
SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1);
|
||||||
|
@ -1183,7 +1183,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID*
|
||||||
|
|
||||||
pWriter->pTIter->tIter.iDelIdx++;
|
pWriter->pTIter->tIter.iDelIdx++;
|
||||||
} else if (c == 0) {
|
} else if (c == 0) {
|
||||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
code = tsdbReadDelDatav1(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, INT64_MAX);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
pWriter->pTIter->tIter.iDelIdx++;
|
pWriter->pTIter->tIter.iDelIdx++;
|
||||||
|
|
|
@ -406,7 +406,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (isAdd) {
|
if (isAdd) {
|
||||||
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
|
qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// traverse to the stream scanner node to add this table id
|
// traverse to the stream scanner node to add this table id
|
||||||
|
|
|
@ -1874,6 +1874,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
|
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
|
||||||
return pInfo->pCreateTbRes;
|
return pInfo->pCreateTbRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
|
qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
|
||||||
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
||||||
return pInfo->pRecoverRes;
|
return pInfo->pRecoverRes;
|
||||||
|
@ -1982,7 +1983,7 @@ FETCH_NEXT_BLOCK:
|
||||||
// printDataBlock(pBlock, "stream scan recv");
|
// printDataBlock(pBlock, "stream scan recv");
|
||||||
return pBlock;
|
return pBlock;
|
||||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
qDebug("scan mode %d", pInfo->scanMode);
|
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
|
||||||
switch (pInfo->scanMode) {
|
switch (pInfo->scanMode) {
|
||||||
case STREAM_SCAN_FROM_RES: {
|
case STREAM_SCAN_FROM_RES: {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
|
@ -2066,8 +2067,13 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
|
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
|
|
||||||
int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
|
int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
|
||||||
|
qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows,
|
||||||
|
id);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
|
||||||
|
qDebug("retrieve data failed, try next block in submit block, %s", id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2075,6 +2081,7 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||||
|
qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
|
||||||
return pInfo->pCreateTbRes;
|
return pInfo->pCreateTbRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2083,6 +2090,8 @@ FETCH_NEXT_BLOCK:
|
||||||
pBlock->info.dataLoad = 1;
|
pBlock->info.dataLoad = 1;
|
||||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
||||||
|
|
||||||
|
qDebug("%" PRId64 " rows in datablock, update res:%" PRId64 " %s", pBlockInfo->rows,
|
||||||
|
pInfo->pUpdateDataRes->info.rows, id);
|
||||||
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2099,7 +2108,7 @@ FETCH_NEXT_BLOCK:
|
||||||
pInfo->numOfExec++;
|
pInfo->numOfExec++;
|
||||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||||
|
|
||||||
qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pBlockInfo->rows > 0) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
@ -2285,7 +2294,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
SArray* pColIds = NULL;
|
SArray* pColIds = NULL;
|
||||||
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
|
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
const char* idstr = pTaskInfo->id.str;
|
||||||
|
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -2396,7 +2406,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
// set the extract column id to streamHandle
|
// set the extract column id to streamHandle
|
||||||
pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
|
pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
|
||||||
SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
|
SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
|
||||||
code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList);
|
code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList, idstr);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -296,11 +296,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
|
|
||||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
||||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
|
||||||
px->submit.msgLen, px->submit.ver, total, size);
|
|
||||||
|
|
||||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
|
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
|
||||||
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort",
|
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
|
||||||
size);
|
size);
|
||||||
streamDataSubmitDestroy(px);
|
streamDataSubmitDestroy(px);
|
||||||
|
@ -314,9 +311,12 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
taosFreeQitem(pItem);
|
taosFreeQitem(pItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
|
px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0);
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
if (/*(pTask->taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
|
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
|
||||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
|
||||||
size);
|
size);
|
||||||
|
|
|
@ -283,7 +283,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
|
||||||
msg.info.noResp = 1;
|
msg.info.noResp = 1;
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
qDebug("s-task:%s dispatch recover finish msg to taskId:%d node %d: recover finish msg", pTask->id.idStr,
|
qDebug("s-task:%s dispatch recover finish msg to downstream taskId:0x%x node %d: recover finish msg", pTask->id.idStr,
|
||||||
pReq->taskId, vgId);
|
pReq->taskId, vgId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -318,7 +318,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
|
||||||
msg.pCont = buf;
|
msg.pCont = buf;
|
||||||
msg.msgType = pTask->dispatchMsgType;
|
msg.msgType = pTask->dispatchMsgType;
|
||||||
|
|
||||||
qDebug("dispatch from s-task:%s to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
|
qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -414,7 +414,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
|
|
||||||
req.taskId = downstreamTaskId;
|
req.taskId = downstreamTaskId;
|
||||||
|
|
||||||
qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
|
qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to down stream s-task:0x%x in vgId:%d", pTask->id.idStr,
|
||||||
pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
|
pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
|
||||||
|
|
||||||
code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
|
code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
|
||||||
|
@ -514,7 +514,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s start to dispatch msg, output status:%d", pTask->id.idStr, pTask->outputStatus);
|
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||||
|
|
||||||
SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
|
SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
|
||||||
if (pDispatchedBlock == NULL) {
|
if (pDispatchedBlock == NULL) {
|
||||||
|
|
|
@ -16,9 +16,9 @@
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
// maximum allowed processed block batches. One block may include several submit blocks
|
// maximum allowed processed block batches. One block may include several submit blocks
|
||||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||||
#define MIN_STREAM_EXEC_BATCH_NUM 8
|
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
||||||
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
||||||
|
|
||||||
static int32_t updateCheckPointInfo(SStreamTask* pTask);
|
static int32_t updateCheckPointInfo(SStreamTask* pTask);
|
||||||
|
|
||||||
|
@ -44,6 +44,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
|
||||||
if (numOfBlocks > 0) {
|
if (numOfBlocks > 0) {
|
||||||
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
|
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
|
||||||
if (pStreamBlocks == NULL) {
|
if (pStreamBlocks == NULL) {
|
||||||
|
qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -133,7 +134,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
||||||
|
|
||||||
taosArrayPush(pRes, &block);
|
taosArrayPush(pRes, &block);
|
||||||
|
|
||||||
qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
|
qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
|
||||||
pTask->selfChildId, numOfBlocks, size / 1048576.0);
|
pTask->selfChildId, numOfBlocks, size / 1048576.0);
|
||||||
|
|
||||||
// current output should be dispatched to down stream nodes
|
// current output should be dispatched to down stream nodes
|
||||||
|
@ -236,11 +237,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
//
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
// if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
|
// qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
|
||||||
streamDispatchStreamBlock(pTask);
|
// streamDispatchStreamBlock(pTask);
|
||||||
}
|
// }
|
||||||
|
|
||||||
if (finished) {
|
if (finished) {
|
||||||
break;
|
break;
|
||||||
|
@ -316,8 +317,13 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||||
|
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
||||||
|
*/
|
||||||
int32_t streamExecForAll(SStreamTask* pTask) {
|
int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
int32_t code = 0;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t batchSize = 1;
|
int32_t batchSize = 1;
|
||||||
int16_t times = 0;
|
int16_t times = 0;
|
||||||
|
@ -325,7 +331,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
SStreamQueueItem* pInput = NULL;
|
SStreamQueueItem* pInput = NULL;
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
|
qDebug("s-task:%s start to extract data block from inputQ", id);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
|
@ -340,7 +346,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
if (qItem == NULL) {
|
if (qItem == NULL) {
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
|
||||||
times++;
|
times++;
|
||||||
taosMsleep(1);
|
taosMsleep(10);
|
||||||
qDebug("===stream===try again batchSize:%d", batchSize);
|
qDebug("===stream===try again batchSize:%d", batchSize);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -365,8 +371,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
batchSize++;
|
batchSize++;
|
||||||
pInput = newRet;
|
pInput = newRet;
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
|
|
||||||
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
|
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
|
||||||
qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
|
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
|
||||||
|
MAX_STREAM_EXEC_BATCH_NUM);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -377,7 +385,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
if (pInput) {
|
if (pInput) {
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,7 +394,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||||
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
|
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
|
||||||
qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize);
|
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
|
||||||
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
|
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -396,16 +403,16 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
||||||
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
|
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
|
||||||
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
|
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id,
|
||||||
atomic_load_8(&pTask->status.taskStatus));
|
atomic_load_8(&pTask->status.taskStatus));
|
||||||
taosMsleep(2);
|
taosMsleep(100);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
|
qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
|
||||||
|
|
||||||
{
|
{
|
||||||
// set input
|
// set input
|
||||||
|
@ -419,22 +426,21 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
|
||||||
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
|
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
|
||||||
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
||||||
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr,
|
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
|
||||||
pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
|
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
|
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
|
||||||
|
|
||||||
SArray* pBlockList = pBlock->blocks;
|
SArray* pBlockList = pBlock->blocks;
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
|
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
|
||||||
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks,
|
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
|
||||||
pBlock->sourceVer);
|
|
||||||
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
|
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
|
||||||
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
|
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||||
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
|
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
|
||||||
|
|
||||||
SArray* pBlockList = pMerged->submits;
|
SArray* pBlockList = pMerged->submits;
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
|
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
|
||||||
qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
|
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
|
||||||
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
|
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
|
||||||
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
|
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
|
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
|
||||||
|
@ -448,9 +454,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
int32_t totalBlocks = 0;
|
int32_t totalBlocks = 0;
|
||||||
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
|
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el,
|
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
|
||||||
resSize / 1048576.0, totalBlocks);
|
id, el, resSize / 1048576.0, totalBlocks);
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -333,7 +333,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType);
|
||||||
pReader->curVersion = ver + 1;
|
pReader->curVersion = ver + 1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue