From 4f81a33889c80d51f99b31794f6dd9295f080425 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 14 Feb 2022 14:11:32 +0800 Subject: [PATCH] extract tqRead out of tq --- source/dnode/vnode/inc/tq.h | 2 +- source/dnode/vnode/src/inc/tqInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 147 +--------------------- source/dnode/vnode/src/tq/tqRead.c | 157 ++++++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeWrite.c | 24 +--- 5 files changed, 166 insertions(+), 165 deletions(-) create mode 100644 source/dnode/vnode/src/tq/tqRead.c diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 9d396e0771..a516f423bb 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -38,7 +38,7 @@ extern "C" { typedef struct STQ STQ; // memory allocator provided by vnode -typedef struct STqMemRef { +typedef struct { SMemAllocatorFactory* pAllocatorFactory; SMemAllocator* pAllocator; } STqMemRef; diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 13c0150cd2..b4f558bd13 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -20,6 +20,7 @@ #include "tlog.h" #include "tq.h" #include "trpc.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8ffedfebe4..412849c13d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -268,15 +268,12 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } void* abuf = buf; tEncodeSMqConsumeRsp(&abuf, &rsp); + if (rsp.pBlockData) { taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); rsp.pBlockData = NULL; - /*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/ - /*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/ - /*tDeleteSSDataBlock(pBlock);*/ - /*}*/ - /*taosArrayDestroy(rsp.pBlockData);*/ } + pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; @@ -344,143 +341,3 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { terrno = TSDB_CODE_SUCCESS; return 0; } - -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { - STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); - if (pReadHandle == NULL) { - return NULL; - } - pReadHandle->pVnodeMeta = pMeta; - pReadHandle->pMsg = NULL; - pReadHandle->ver = -1; - pReadHandle->pColIdList = NULL; - pReadHandle->sver = -1; - pReadHandle->pSchema = NULL; - pReadHandle->pSchemaWrapper = NULL; - return pReadHandle; -} - -void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { - pReadHandle->pMsg = pMsg; - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); - pReadHandle->ver = ver; - memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); -} - -bool tqNextDataBlock(STqReadHandle* pHandle) { - while (1) { - if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { - return false; - } - if (pHandle->pBlock == NULL) return false; - - pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); - /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ - ASSERT(pHandle->tbIdHash); - void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); - if (ret != NULL) { - pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); - pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); - pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); - pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); - pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); - return true; - } - } - return false; -} - -int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - /*int32_t sversion = pHandle->pBlock->sversion;*/ - /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/ - pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); - pBlockInfo->rows = pHandle->pBlock->numOfRows; - pBlockInfo->uid = pHandle->pBlock->uid; - return 0; -} - -SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { - /*int32_t sversion = pHandle->pBlock->sversion;*/ - // TODO set to real sversion - int32_t sversion = 0; - if (pHandle->sver != sversion) { - pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); - - tb_uid_t quid; - STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); - if (pTbCfg->type == META_CHILD_TABLE) { - quid = pTbCfg->ctbCfg.suid; - } else { - quid = pHandle->pBlock->uid; - } - pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); - pHandle->sver = sversion; - } - - STSchema* pTschema = pHandle->pSchema; - SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; - - int32_t numOfRows = pHandle->pBlock->numOfRows; - int32_t numOfCols = pHandle->pSchema->numOfCols; - int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); - - // TODO: stable case - if (colNumNeed > pSchemaWrapper->nCols) { - colNumNeed = pSchemaWrapper->nCols; - } - - SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); - if (pArray == NULL) { - return NULL; - } - - int j = 0; - for (int32_t i = 0; i < colNumNeed; i++) { - int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i); - while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { - j++; - } - SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; - SColumnInfoData colInfo = {0}; - int sz = numOfRows * pColSchema->bytes; - colInfo.info.bytes = pColSchema->bytes; - colInfo.info.colId = colId; - colInfo.info.type = pColSchema->type; - - colInfo.pData = calloc(1, sz); - if (colInfo.pData == NULL) { - // TODO free - taosArrayDestroy(pArray); - return NULL; - } - taosArrayPush(pArray, &colInfo); - } - - STSRowIter iter = {0}; - tdSTSRowIterInit(&iter, pTschema); - STSRow* row; - // int32_t kvIdx = 0; - int32_t curRow = 0; - tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); - while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { - tdSTSRowIterReset(&iter, row); - // get all wanted col of that block - for (int32_t i = 0; i < colNumNeed; i++) { - SColumnInfoData* pColData = taosArrayGet(pArray, i); - STColumn* pCol = schemaColAt(pTschema, i); - // TODO - ASSERT(pCol->colId == pColData->info.colId); - // void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); - SCellVal sVal = {0}; - if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) { - // TODO: reach end - break; - } - memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes); - } - curRow++; - } - return pArray; -} diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c new file mode 100644 index 0000000000..a0e8e7a8f5 --- /dev/null +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#define _DEFAULT_SOURCE + +#include "tqInt.h" + +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { + STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); + if (pReadHandle == NULL) { + return NULL; + } + pReadHandle->pVnodeMeta = pMeta; + pReadHandle->pMsg = NULL; + pReadHandle->ver = -1; + pReadHandle->pColIdList = NULL; + pReadHandle->sver = -1; + pReadHandle->pSchema = NULL; + pReadHandle->pSchemaWrapper = NULL; + return pReadHandle; +} + +void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { + pReadHandle->pMsg = pMsg; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); + pReadHandle->ver = ver; + memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); +} + +bool tqNextDataBlock(STqReadHandle* pHandle) { + while (1) { + if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + return false; + } + if (pHandle->pBlock == NULL) return false; + + pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); + /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ + ASSERT(pHandle->tbIdHash); + void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); + if (ret != NULL) { + pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); + pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); + pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); + pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); + pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); + return true; + } + } + return false; +} + +int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { + /*int32_t sversion = pHandle->pBlock->sversion;*/ + /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/ + pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); + pBlockInfo->rows = pHandle->pBlock->numOfRows; + pBlockInfo->uid = pHandle->pBlock->uid; + return 0; +} + +SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { + /*int32_t sversion = pHandle->pBlock->sversion;*/ + // TODO set to real sversion + int32_t sversion = 0; + if (pHandle->sver != sversion) { + pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); + + tb_uid_t quid; + STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); + if (pTbCfg->type == META_CHILD_TABLE) { + quid = pTbCfg->ctbCfg.suid; + } else { + quid = pHandle->pBlock->uid; + } + pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); + pHandle->sver = sversion; + } + + STSchema* pTschema = pHandle->pSchema; + SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; + + int32_t numOfRows = pHandle->pBlock->numOfRows; + int32_t numOfCols = pHandle->pSchema->numOfCols; + int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); + + // TODO: stable case + if (colNumNeed > pSchemaWrapper->nCols) { + colNumNeed = pSchemaWrapper->nCols; + } + + SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); + if (pArray == NULL) { + return NULL; + } + + int j = 0; + for (int32_t i = 0; i < colNumNeed; i++) { + int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i); + while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { + j++; + } + SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; + SColumnInfoData colInfo = {0}; + int sz = numOfRows * pColSchema->bytes; + colInfo.info.bytes = pColSchema->bytes; + colInfo.info.colId = colId; + colInfo.info.type = pColSchema->type; + + colInfo.pData = calloc(1, sz); + if (colInfo.pData == NULL) { + // TODO free + taosArrayDestroy(pArray); + return NULL; + } + taosArrayPush(pArray, &colInfo); + } + + STSRowIter iter = {0}; + tdSTSRowIterInit(&iter, pTschema); + STSRow* row; + // int32_t kvIdx = 0; + int32_t curRow = 0; + tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); + while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { + tdSTSRowIterReset(&iter, row); + // get all wanted col of that block + for (int32_t i = 0; i < colNumNeed; i++) { + SColumnInfoData* pColData = taosArrayGet(pArray, i); + STColumn* pCol = schemaColAt(pTschema, i); + // TODO + ASSERT(pCol->colId == pColData->info.colId); + // void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); + SCellVal sVal = {0}; + if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) { + // TODO: reach end + break; + } + memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes); + } + curRow++; + } + return pArray; +} diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 4d048bc6e2..b5b8b2b459 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -16,19 +16,6 @@ #include "tq.h" #include "vnd.h" -#if 0 -int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { - switch (pMsg->msgType) { - case TDMT_VND_MQ_SET_CUR: - if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) { - // TODO: handle error - } - break; - } - return 0; -} -#endif - int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { SRpcMsg *pMsg; @@ -36,9 +23,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); // ser request version - void * pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int64_t ver = pVnode->state.processed++; - taosEncodeFixedU64(&pBuf, ver); + taosEncodeFixedI64(&pBuf, ver); if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { // TODO: handle error @@ -55,7 +42,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SVCreateTbReq vCreateTbReq; SVCreateTbBatchReq vCreateTbBatchReq; - void * ptr = vnodeMalloc(pVnode, pMsg->contLen); + void *ptr = vnodeMalloc(pVnode, pMsg->contLen); if (ptr == NULL) { // TODO: handle error } @@ -64,8 +51,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { memcpy(ptr, pMsg->pCont, pMsg->contLen); // todo: change the interface here - uint64_t ver; - taosDecodeFixedU64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); + int64_t ver; + taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) { // TODO: handle error } @@ -132,7 +119,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } break; case TDMT_VND_MQ_REB: { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) { - } } break; default: