diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d503592361..7728b0b5eb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2957,6 +2957,7 @@ typedef struct { int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); +void tDeleteSMqDataRsp(SMqDataRsp* pRsp); typedef struct { SMqRspHead head; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2fc93cc9b5..af29ab7c50 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5889,6 +5889,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { return 0; } +void tDeleteSMqDataRsp(SMqDataRsp *pRsp) { + taosArrayDestroy(pRsp->blockDataLen); + taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); + taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); +} + int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index ff208eae60..eb072d013d 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -763,8 +763,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl int32_t cols = 0; char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB); - tNameGetDbName(&n, varDataVal(topicName)); + strcpy(varDataVal(topicName), mndGetDbStr(pTopic->name)); + /*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/ + /*tNameGetDbName(&n, varDataVal(topicName));*/ varDataSetLen(topicName, strlen(varDataVal(topicName))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)topicName, false); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a97c8ff132..753cdc603e 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -67,21 +67,21 @@ typedef struct { // tqExec typedef struct { - char* qmsg; + char* qmsg; } STqExecCol; typedef struct { - int64_t suid; + int64_t suid; } STqExecTb; typedef struct { - SHashObj* pFilterOutTbUid; + SHashObj* pFilterOutTbUid; } STqExecDb; typedef struct { int8_t subType; - STqReader* pExecReader; + STqReader* pExecReader; qTaskInfo_t task; union { STqExecCol execCol; @@ -144,7 +144,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); // tqExec -int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp); +int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); // tqMeta diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 54f764c6b3..eed997b486 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -357,8 +357,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { TD_VID(pTq->pVnode), formatBuf); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { - if (pReq->useSnapshot){ - if (pHandle->fetchMeta){ + if (pReq->useSnapshot) { + if (pHandle->fetchMeta) { tqOffsetResetToMeta(&fetchOffsetNew, 0); } else { tqOffsetResetToData(&fetchOffsetNew, 0, 0); @@ -373,43 +373,47 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } - goto OVER; + tDeleteSMqDataRsp(&dataRsp); + return code; } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; code = -1; - goto OVER; + tDeleteSMqDataRsp(&dataRsp); + return code; } } } - if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG){ + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG) { SMqMetaRsp metaRsp = {0}; tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew); - if(metaRsp.metaRspLen > 0){ + if (metaRsp.metaRspLen > 0) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { code = -1; } - tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey, - TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, + pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, + metaRsp.rspOffset.version); taosMemoryFree(metaRsp.metaRsp); goto OVER; } - if (dataRsp.blockNum > 0){ + if (dataRsp.blockNum > 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } goto OVER; - }else{ + } else { fetchOffsetNew = dataRsp.rspOffset; } - tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey, - TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.version); + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", + consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, + dataRsp.rspOffset.uid, dataRsp.rspOffset.version); } if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { @@ -426,7 +430,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { consumerEpoch = atomic_load_32(&pHandle->epoch); if (consumerEpoch > reqEpoch) { tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", + ", found new consumer epoch %d, discard req epoch %d", consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); break; } @@ -449,7 +453,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) { + if (tqLogScanExec(pTq, pHandle, pCont, &dataRsp) < 0) { /*ASSERT(0);*/ } // TODO batch optimization: @@ -490,18 +494,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { OVER: if (pCkHead) taosMemoryFree(pCkHead); - // TODO wrap in destroy func - taosArrayDestroy(dataRsp.blockDataLen); - taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); - - if (dataRsp.withSchema) { - taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); - } - - if (dataRsp.withTbName) { - taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree); - } - + tDeleteSMqDataRsp(&dataRsp); return code; } @@ -629,9 +622,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); - buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); - pHandle->execHandle.task = - qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); + buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, + (SSnapContext**)(&handle.sContext)); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index a0b8141cfb..bfd23f1a1a 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -60,6 +60,46 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { return 0; } +int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { + const STqExecHandle* pExec = &pHandle->execHandle; + ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); + + qTaskInfo_t task = pExec->task; + + if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { + tqDebug("prepare scan failed, return"); + if (pOffset->type == TMQ_OFFSET__LOG) { + pRsp->rspOffset = *pOffset; + return 0; + } else { + tqOffsetResetToLog(pOffset, pHandle->snapshotVer); + if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { + tqDebug("prepare scan failed, return"); + pRsp->rspOffset = *pOffset; + return 0; + } + } + } + + int32_t rowCnt = 0; + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + tqDebug("tmqsnap task start to execute"); + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(0); + } + tqDebug("tmqsnap task execute end, get %p", pDataBlock); + + if (pDataBlock) { + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + pRsp->blockNum++; + } + } + + return 0; +} + int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; @@ -97,23 +137,20 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { continue; } - } else { - char* tbName = strdup(qExtractTbnameFromTask(task)); - taosArrayPush(pRsp->blockTbName, &tbName); } } - if(pRsp->withSchema){ + if (pRsp->withSchema) { if (pOffset->type == TMQ_OFFSET__LOG) { tqAddBlockSchemaToRsp(pExec, pRsp); - }else{ + } else { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); taosArrayPush(pRsp->blockSchema, &pSW); } } - if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); - }else{ + } else { tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); } pRsp->blockNum++; @@ -125,17 +162,9 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* } } - if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ - if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), - pHandle->snapshotVer + 1); - tqOffsetResetToLog(pOffset, pHandle->snapshotVer); - qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - continue; - } - }else{ - if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ - if(qStreamExtractPrepareUid(task) != 0){ + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { + if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (qStreamExtractPrepareUid(task) != 0) { continue; } tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), @@ -143,13 +172,13 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* break; } - if (pRsp->blockNum > 0){ + if (pRsp->blockNum > 0) { tqDebug("tmqsnap task exec exited, get data"); break; } SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); - if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){ + if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts); qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; @@ -173,57 +202,8 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* return 0; } -#if 0 -int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) { - ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); - qTaskInfo_t task = pExec->execCol.task[workerId]; - - if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) { - ASSERT(0); - } - - int32_t rowCnt = 0; - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); - } - if (pDataBlock == NULL) break; - - ASSERT(pDataBlock->info.rows != 0); - ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0); - - tqAddBlockDataToRsp(pDataBlock, pRsp); - - if (pRsp->withTbName) { - pRsp->withTbName = 0; -#if 0 - int64_t uid; - int64_t ts; - if (qGetStreamScanStatus(task, &uid, &ts) < 0) { - ASSERT(0); - } - tqAddTbNameToRsp(pTq, uid, pRsp); -#endif - } - pRsp->blockNum++; - - rowCnt += pDataBlock->info.rows; - if (rowCnt >= 4096) break; - } - int64_t uid; - int64_t ts; - if (qGetStreamScanStatus(task, &uid, &ts) < 0) { - ASSERT(0); - } - tqOffsetResetToData(&pRsp->rspOffset, uid, ts); - - return 0; -} -#endif - -int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) { +int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp) { + STqExecHandle* pExec = &pHandle->execHandle; ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { @@ -268,6 +248,28 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR tqAddBlockSchemaToRsp(pExec, pRsp); pRsp->blockNum++; } +#if 0 + if (pHandle->fetchMeta && pRsp->blockNum) { + SSubmitMsgIter iter = {0}; + tInitSubmitMsgIter(pReq, &iter); + STaosxRsp* pXrsp = (STaosxRsp*)pRsp; + while (1) { + SSubmitBlk* pBlk = NULL; + if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1; + if (pBlk->schemaLen > 0) { + if (pXrsp->createTableNum == 0) { + pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + pXrsp->createTableReq = taosArrayInit(0, sizeof(void*)); + } + void* createReq = taosMemoryCalloc(1, pBlk->schemaLen); + memcpy(createReq, pBlk->data, pBlk->schemaLen); + taosArrayPush(pXrsp->createTableLen, &pBlk->schemaLen); + taosArrayPush(pXrsp->createTableReq, &createReq); + pXrsp->createTableNum++; + } + } + } +#endif } if (pRsp->blockNum == 0) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f0518a72ab..b4e2840330 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -143,6 +143,8 @@ typedef struct { STqOffsetVal prepareStatus; // for tmq STqOffsetVal lastStatus; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta + int64_t snapshotVer; + SSchemaWrapper *schema; char tbName[TSDB_TABLE_NAME_LEN]; SSDataBlock* pullOverBlk; // for streaming @@ -486,24 +488,23 @@ typedef struct SStreamScanInfo { STimeWindowAggSupp twAggSup; SSDataBlock* pUpdateDataRes; // status for tmq - // SSchemaWrapper schema; - SNodeList* pGroupTags; - SNode* pTagCond; - SNode* pTagIndexCond; + SNodeList* pGroupTags; + SNode* pTagCond; + SNode* pTagIndexCond; } SStreamScanInfo; -typedef struct SStreamRawScanInfo{ -// int8_t subType; -// bool withMeta; -// int64_t suid; -// int64_t snapVersion; -// void *metaInfo; -// void *dataInfo; - SVnode* vnode; - SSDataBlock pRes; // result SSDataBlock - STsdbReader* dataReader; - SSnapContext* sContext; -}SStreamRawScanInfo; +typedef struct { + // int8_t subType; + // bool withMeta; + // int64_t suid; + // int64_t snapVersion; + // void *metaInfo; + // void *dataInfo; + SVnode* vnode; + SSDataBlock pRes; // result SSDataBlock + STsdbReader* dataReader; + SSnapContext* sContext; +} SStreamRawScanInfo; typedef struct SSysTableScanInfo { SRetrieveMetaTableRsp* pRsp; @@ -528,14 +529,14 @@ typedef struct SBlockDistInfo { SSDataBlock* pResBlock; void* pHandle; SReadHandle readHandle; - uint64_t uid; // table uid + uint64_t uid; // table uid } SBlockDistInfo; // todo remove this typedef struct SOptrBasicInfo { - SResultRowInfo resultRowInfo; - SSDataBlock* pRes; - bool mergeResultBlock; + SResultRowInfo resultRowInfo; + SSDataBlock* pRes; + bool mergeResultBlock; } SOptrBasicInfo; typedef struct SIntervalAggOperatorInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 124f4b44b0..7e631ab3e9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -139,7 +139,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { if (msg == NULL) { - // TODO create raw scan + // create raw scan SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); if (NULL == pTaskInfo) { @@ -151,7 +151,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE; pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo); - if(NULL == pTaskInfo->pRoot){ + if (NULL == pTaskInfo->pRoot) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pTaskInfo); return NULL; @@ -834,11 +834,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } else { ASSERT(0); } - }else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; - SSnapContext* sContext = pInfo->sContext; - if(setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid); + SSnapContext* sContext = pInfo->sContext; + if (setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); return -1; } @@ -847,27 +847,25 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->dataReader = NULL; cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList); - if(mtInfo.uid == 0) return 0; // no data + if (mtInfo.uid == 0) return 0; // no data initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo); pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, + &pInfo->dataReader, NULL); - strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); - tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); - pTaskInfo->streamInfo.schema = mtInfo.schema; qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts); - }else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){ + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; - SSnapContext* sContext = pInfo->sContext; - if(setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid); + SSnapContext* sContext = pInfo->sContext; + if (setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setForSnapShot error. uid:%" PRIi64 " ,version:%" PRIi64, pOffset->uid); return -1; } qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %ld ts %ld", pOffset->uid); - }else if (pOffset->type == TMQ_OFFSET__LOG) { + } else if (pOffset->type == TMQ_OFFSET__LOG) { SStreamRawScanInfo* pInfo = pOperator->info; tsdbReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4ffa80d468..f3ff13ef85 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -268,7 +268,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, - sizeof(SResultRowPosition)); + sizeof(SResultRowPosition)); } // 2. set the new time window to be the new active time window @@ -2815,92 +2815,6 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan } } } -#if 0 -int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { - uint8_t type = pOperator->operatorType; - - pOperator->status = OP_OPENED; - - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pScanInfo = pOperator->info; - pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN; - - pScanInfo->pTableScanOp->status = OP_OPENED; - - STableScanInfo* pInfo = pScanInfo->pTableScanOp->info; - ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER); - - if (uid == 0) { - pInfo->noTable = 1; - return TSDB_CODE_SUCCESS; - } - - /*if (pSnapShotScanInfo->dataReader == NULL) {*/ - /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/ - /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/ - /*}*/ - - pInfo->noTable = 0; - - if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); - bool found = false; - for (int32_t i = 0; i < tableSz; i++) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); - if (pTableInfo->uid == uid) { - found = true; - pInfo->currentTable = i; - } - } - // TODO after processing drop, found can be false - ASSERT(found); - - tsdbSetTableId(pInfo->dataReader, uid); - int64_t oldSkey = pInfo->cond.twindows.skey; - pInfo->cond.twindows.skey = ts + 1; - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); - pInfo->cond.twindows.skey = oldSkey; - pInfo->scanTimes = 0; - - qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts, - pInfo->currentTable, tableSz); - } - - return TSDB_CODE_SUCCESS; - - } else { - if (pOperator->numOfDownstream == 1) { - return doPrepareScan(pOperator->pDownstream[0], uid, ts); - } else if (pOperator->numOfDownstream == 0) { - qError("failed to find stream scan operator to set the input data block"); - return TSDB_CODE_QRY_APP_ERROR; - } else { - qError("join not supported for stream block scan"); - return TSDB_CODE_QRY_APP_ERROR; - } - } -} - -int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { - int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pScanInfo = pOperator->info; - STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info; - *uid = pSnapShotScanInfo->lastStatus.uid; - *ts = pSnapShotScanInfo->lastStatus.ts; - } else { - if (pOperator->pDownstream[0] == NULL) { - return TSDB_CODE_INVALID_PARA; - } else { - doGetScanStatus(pOperator->pDownstream[0], uid, ts); - } - } - - return TSDB_CODE_SUCCESS; -} -#endif // this is a blocking operator static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { @@ -3024,7 +2938,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset); setBufPageDirty(pPage, true); releaseBufPage(pSup->pResultBuf, pPage); - + int32_t iter = 0; void* pIter = NULL; while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) { @@ -3434,7 +3348,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) { - int32_t code = 0; + int32_t code = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pAggSup->currentPageId = -1; @@ -4294,42 +4208,6 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { return pList; } -#if 0 -STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, const char* idstr) { - int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { - code = 0; - qDebug("no table qualified for query, %s", idstr); - goto _error; - } - - SQueryTableDataCond cond = {0}; - code = initQueryTableDataCond(&cond, pTableScanNode); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - STsdbReader* pReader; - code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - cleanupQueryTableDataCond(&cond); - - return pReader; - -_error: - terrno = code; - return NULL; -} -#endif - static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d4c98adb7c..b3d865f591 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1219,7 +1219,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32 static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; - SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); @@ -1228,7 +1228,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupIdPre) { pInfo->pRes->info.groupId = *groupIdPre; } else { @@ -1334,9 +1334,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } } else if (ret.fetchType == FETCH_TYPE__META) { ASSERT(0); -// pTaskInfo->streamInfo.lastStatus = ret.offset; -// pTaskInfo->streamInfo.metaBlk = ret.meta; -// return NULL; + // pTaskInfo->streamInfo.lastStatus = ret.offset; + // pTaskInfo->streamInfo.metaBlk = ret.meta; + // return NULL; } else if (ret.fetchType == FETCH_TYPE__NONE) { pTaskInfo->streamInfo.lastStatus = ret.offset; ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); @@ -1554,14 +1554,14 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { } static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { -// NOTE: this operator does never check if current status is done or not - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + // NOTE: this operator does never check if current status is done or not + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamRawScanInfo* pInfo = pOperator->info; - pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta + pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; qDebug("tmqsnap doRawScan called"); - if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){ + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pBlock = &pInfo->pRes; if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { @@ -1585,42 +1585,38 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext); - if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal + if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion; - tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); - }else{ + } else { pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN; qDebug("tmqsnap change get data uid:%ld", mtInfo.uid); qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); - strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); - tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); - pTaskInfo->streamInfo.schema = mtInfo.schema; } qDebug("tmqsnap stream scan tsdb return null"); return NULL; - }else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){ - SSnapContext *sContext = pInfo->sContext; - void* data = NULL; - int32_t dataLen = 0; - int16_t type = 0; - int64_t uid = 0; - if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){ + } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) { + SSnapContext* sContext = pInfo->sContext; + void* data = NULL; + int32_t dataLen = 0; + int16_t type = 0; + int64_t uid = 0; + if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) { qError("tmqsnap getMetafromSnapShot error"); taosMemoryFreeClear(data); return NULL; } - if(!sContext->queryMetaOrData){ // change to get data next poll request + if (!sContext->queryMetaOrData) { // change to get data next poll request pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; pTaskInfo->streamInfo.lastStatus.uid = uid; pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA; pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0; pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN; - }else{ + } else { pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; pTaskInfo->streamInfo.lastStatus.uid = uid; pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus; @@ -1631,44 +1627,44 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; } -// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { -// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1; -// -// while(1){ -// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { -// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); -// pTaskInfo->streamInfo.lastStatus.version = fetchVer; -// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; -// return NULL; -// } -// SWalCont* pHead = &pInfo->pCkHead->head; -// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); -// -// if (pHead->msgType == TDMT_VND_SUBMIT) { -// SSubmitReq* pCont = (SSubmitReq*)&pHead->body; -// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); -// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes); -// if(block){ -// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; -// pTaskInfo->streamInfo.lastStatus.version = fetchVer; -// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); -// return block; -// }else{ -// fetchVer++; -// } -// } else{ -// ASSERT(pInfo->sContext->withMeta); -// ASSERT(IS_META_MSG(pHead->msgType)); -// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); -// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; -// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; -// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; -// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; -// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); -// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); -// return NULL; -// } -// } + // else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { + // int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1; + // + // while(1){ + // if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { + // qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); + // pTaskInfo->streamInfo.lastStatus.version = fetchVer; + // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; + // return NULL; + // } + // SWalCont* pHead = &pInfo->pCkHead->head; + // qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); + // + // if (pHead->msgType == TDMT_VND_SUBMIT) { + // SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + // tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); + // SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, + // &pInfo->pRes); if(block){ + // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; + // pTaskInfo->streamInfo.lastStatus.version = fetchVer; + // qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + // return block; + // }else{ + // fetchVer++; + // } + // } else{ + // ASSERT(pInfo->sContext->withMeta); + // ASSERT(IS_META_MSG(pHead->msgType)); + // qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + // pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; + // pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; + // pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; + // pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; + // pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); + // memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); + // return NULL; + // } + // } return NULL; } @@ -1689,7 +1685,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT // create tq reader SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return NULL; @@ -1699,13 +1695,12 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pInfo->sContext = pHandle->sContext; pOperator->name = "RawStreamScanOperator"; -// pOperator->blocking = false; -// pOperator->status = OP_NOT_OPENED; + // pOperator->blocking = false; + // pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, - NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL); return pOperator; } @@ -1724,7 +1719,7 @@ static void destroyStreamScanOperatorInfo(void* param) { } if (pStreamScan->pPseudoExpr) { destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr); - taosMemoryFreeClear(pStreamScan->pPseudoExpr); + taosMemoryFree(pStreamScan->pPseudoExpr); } updateInfoDestroy(pStreamScan->pUpdateInfo);