refactor: support submitreq2

This commit is contained in:
Liu Jicong 2022-12-07 16:39:46 +08:00
parent 8cfba40320
commit c5dc7e8227
3 changed files with 8 additions and 5 deletions

View File

@ -252,7 +252,7 @@ END:
} }
STqReader* tqOpenReader(SVnode* pVnode) { STqReader* tqOpenReader(SVnode* pVnode) {
STqReader* pReader = taosMemoryMalloc(sizeof(STqReader)); STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
if (pReader == NULL) { if (pReader == NULL) {
return NULL; return NULL;
} }
@ -437,12 +437,15 @@ bool tqNextDataBlock2(STqReader* pReader) {
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) { while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
ASSERT(pSubmitTbData->uid);
if (pReader->tbIdHash == NULL) return true; if (pReader->tbIdHash == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) { if (ret != NULL) {
return true; return true;
} }
pReader->nextBlk++;
} }
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);

View File

@ -115,13 +115,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
// ASSERT(numOfBlocks > 1); // ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SPackedSubmit* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(SPackedSubmit)); SPackedSubmit* pReq = POINTER_SHIFT(input, i * sizeof(SPackedSubmit));
taosArrayPush(pInfo->pBlockLists, &pReq); taosArrayPush(pInfo->pBlockLists, pReq);
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(numOfBlocks == 1); ASSERT(numOfBlocks == 1);
taosArrayPush(pInfo->pBlockLists, &input); taosArrayPush(pInfo->pBlockLists, input);
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {

View File

@ -1929,7 +1929,7 @@ FETCH_NEXT_BLOCK:
} }
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SPackedSubmit* pSubmit = taosArrayGetP(pInfo->pBlockLists, current); SPackedSubmit* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,