homework-jianmu/source/dnode/vnode/src/tq/tqRead.c

177 lines
6.1 KiB
C

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdatablock.h"
#include "vnode.h"
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) {
return NULL;
}
pReadHandle->pVnodeMeta = pMeta;
pReadHandle->pMsg = NULL;
pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL;
pReadHandle->sver = -1;
pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL;
pReadHandle->tbIdHash = NULL;
return pReadHandle;
}
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate and convert
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break;
pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
}
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
return 0;
}
bool tqNextDataBlock(STqReadHandle* pHandle) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
/*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/
/*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/
/*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/
/*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/
return true;
/*} else {*/
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
}
}
return false;
}
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
// currently only rows are used
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid;
return 0;
}
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t sversion = 0;
if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);
tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
} else {
quid = pHandle->pBlock->uid;
}
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
pHandle->sver = sversion;
}
STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
int32_t numOfRows = pHandle->pBlock->numOfRows;
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols;
}
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (pArray == NULL) {
return NULL;
}
int32_t colMeta = 0;
int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
if (colIdSchema < colIdNeed) {
colMeta++;
} else if (colIdSchema > colIdNeed) {
colNeed++;
} else {
SColumnInfoData colInfo = {0};
/*int sz = numOfRows * pColSchema->bytes;*/
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type;
if (blockDataEnsureColumnCapacity(&colInfo, numOfRows) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
}
taosArrayPush(pArray, &colInfo);
colMeta++;
colNeed++;
}
}
STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema);
STSRow* row;
int32_t curRow = 0;
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
// get all wanted col of that block
int32_t colTot = taosArrayGetSize(pArray);
for (int32_t i = 0; i < colTot; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break;
}
// TODO handle null
colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL);
}
curRow++;
}
return pArray;
}