refactor(taosx): filter meta
This commit is contained in:
parent
6ec9b3d93c
commit
502d75302e
|
@ -241,6 +241,20 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (pHandle->fetchMeta) {
|
||||||
|
SSubmitBlk* pBlk = pReader->pBlock;
|
||||||
|
if (pBlk->schemaLen > 0) {
|
||||||
|
if (pRsp->createTableNum == 0) {
|
||||||
|
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
|
||||||
|
}
|
||||||
|
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
|
||||||
|
memcpy(createReq, pBlk->data, pBlk->schemaLen);
|
||||||
|
taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen);
|
||||||
|
taosArrayPush(pRsp->createTableReq, &createReq);
|
||||||
|
pRsp->createTableNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock));
|
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock));
|
||||||
blockDataFreeRes(&block);
|
blockDataFreeRes(&block);
|
||||||
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
|
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
|
||||||
|
@ -261,34 +275,25 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (pHandle->fetchMeta) {
|
||||||
|
SSubmitBlk* pBlk = pReader->pBlock;
|
||||||
|
if (pBlk->schemaLen > 0) {
|
||||||
|
if (pRsp->createTableNum == 0) {
|
||||||
|
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
|
||||||
|
}
|
||||||
|
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
|
||||||
|
memcpy(createReq, pBlk->data, pBlk->schemaLen);
|
||||||
|
taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen);
|
||||||
|
taosArrayPush(pRsp->createTableReq, &createReq);
|
||||||
|
pRsp->createTableNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock));
|
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock));
|
||||||
blockDataFreeRes(&block);
|
blockDataFreeRes(&block);
|
||||||
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
|
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
#if 1
|
|
||||||
if (pHandle->fetchMeta && pRsp->blockNum) {
|
|
||||||
SSubmitMsgIter iter = {0};
|
|
||||||
tInitSubmitMsgIter(pReq, &iter);
|
|
||||||
STaosxRsp* pXrsp = (STaosxRsp*)pRsp;
|
|
||||||
while (1) {
|
|
||||||
SSubmitBlk* pBlk = NULL;
|
|
||||||
if (tGetSubmitMsgNext(&iter, &pBlk) < 0) break;
|
|
||||||
if (pBlk == NULL) break;
|
|
||||||
if (pBlk->schemaLen > 0) {
|
|
||||||
if (pXrsp->createTableNum == 0) {
|
|
||||||
pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
|
|
||||||
pXrsp->createTableReq = taosArrayInit(0, sizeof(void*));
|
|
||||||
}
|
|
||||||
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
|
|
||||||
memcpy(createReq, pBlk->data, pBlk->schemaLen);
|
|
||||||
taosArrayPush(pXrsp->createTableLen, &pBlk->schemaLen);
|
|
||||||
taosArrayPush(pXrsp->createTableReq, &createReq);
|
|
||||||
pXrsp->createTableNum++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->blockNum == 0) {
|
if (pRsp->blockNum == 0) {
|
||||||
|
|
|
@ -811,7 +811,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO after dropping table, table may be not found
|
// TODO after dropping table, table may not found
|
||||||
ASSERT(found);
|
ASSERT(found);
|
||||||
|
|
||||||
if (pTableScanInfo->dataReader == NULL) {
|
if (pTableScanInfo->dataReader == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue