refactor: do some internal refactor.
This commit is contained in:
parent
0d60f24db5
commit
50a64b1ee8
|
@ -32,18 +32,9 @@
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
|
|
||||||
typedef struct SBlockDistInfo {
|
|
||||||
SSDataBlock* pResBlock;
|
|
||||||
STsdbReader* pHandle;
|
|
||||||
SReadHandle readHandle;
|
|
||||||
uint64_t uid; // table uid
|
|
||||||
} SBlockDistInfo;
|
|
||||||
|
|
||||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct STableMergeScanExecInfo {
|
typedef struct STableMergeScanExecInfo {
|
||||||
SFileBlockLoadRecorder blockRecorder;
|
SFileBlockLoadRecorder blockRecorder;
|
||||||
SSortExecInfo sortExecInfo;
|
SSortExecInfo sortExecInfo;
|
||||||
|
@ -946,169 +937,6 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, const char* idstr) {
|
|
||||||
*rowLen = 0;
|
|
||||||
|
|
||||||
SMetaReader mr = {0};
|
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, uid);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mr.me.type == TSDB_SUPER_TABLE) {
|
|
||||||
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
(*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
|
||||||
}
|
|
||||||
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
|
||||||
uint64_t suid = mr.me.ctbEntry.suid;
|
|
||||||
tDecoderClear(&mr.coder);
|
|
||||||
code = metaGetTableEntryByUid(&mr, suid);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
(*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
|
||||||
}
|
|
||||||
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
|
|
||||||
int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
(*rowLen) += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SBlockDistInfo* pBlockScanInfo = pOperator->info;
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
||||||
|
|
||||||
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
|
|
||||||
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid,
|
|
||||||
(int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
|
|
||||||
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
|
|
||||||
|
|
||||||
SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
|
|
||||||
|
|
||||||
int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
|
|
||||||
|
|
||||||
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
|
|
||||||
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
|
||||||
tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
|
||||||
varDataSetLen(p, len);
|
|
||||||
|
|
||||||
colDataAppend(pColInfo, 0, p, false);
|
|
||||||
taosMemoryFree(p);
|
|
||||||
|
|
||||||
pBlock->info.rows = 1;
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyBlockDistScanOperatorInfo(void* param) {
|
|
||||||
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
|
|
||||||
blockDataDestroy(pDistInfo->pResBlock);
|
|
||||||
tsdbReaderClose(pDistInfo->pHandle);
|
|
||||||
taosMemoryFreeClear(param);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
|
|
||||||
memset(pCond, 0, sizeof(SQueryTableDataCond));
|
|
||||||
|
|
||||||
pCond->order = TSDB_ORDER_ASC;
|
|
||||||
pCond->numOfCols = 1;
|
|
||||||
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
|
|
||||||
if (pCond->colList == NULL) {
|
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCond->colList->colId = 1;
|
|
||||||
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
pCond->colList->bytes = sizeof(TSKEY);
|
|
||||||
|
|
||||||
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
|
||||||
pCond->suid = uid;
|
|
||||||
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
|
|
||||||
pCond->startVersion = -1;
|
|
||||||
pCond->endVersion = -1;
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
|
|
||||||
SExecTaskInfo* pTaskInfo) {
|
|
||||||
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
SQueryTableDataCond cond = {0};
|
|
||||||
|
|
||||||
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
|
|
||||||
size_t num = tableListGetSize(pTableListInfo);
|
|
||||||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
|
||||||
|
|
||||||
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
|
|
||||||
cleanupQueryTableDataCond(&cond);
|
|
||||||
if (code != 0) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->readHandle = *readHandle;
|
|
||||||
pInfo->uid = pBlockScanNode->suid;
|
|
||||||
|
|
||||||
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
|
|
||||||
blockDataEnsureCapacity(pInfo->pResBlock, 1);
|
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
|
||||||
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet =
|
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
|
|
||||||
return pOperator;
|
|
||||||
|
|
||||||
_error:
|
|
||||||
taosMemoryFreeClear(pInfo);
|
|
||||||
taosMemoryFreeClear(pOperator);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
||||||
taosArrayClear(pInfo->pBlockLists);
|
taosArrayClear(pInfo->pBlockLists);
|
||||||
pInfo->validBlockIndex = 0;
|
pInfo->validBlockIndex = 0;
|
||||||
|
|
|
@ -78,6 +78,13 @@ typedef struct MergeIndex {
|
||||||
int len;
|
int len;
|
||||||
} MergeIndex;
|
} MergeIndex;
|
||||||
|
|
||||||
|
typedef struct SBlockDistInfo {
|
||||||
|
SSDataBlock* pResBlock;
|
||||||
|
STsdbReader* pHandle;
|
||||||
|
SReadHandle readHandle;
|
||||||
|
uint64_t uid; // table uid
|
||||||
|
} SBlockDistInfo;
|
||||||
|
|
||||||
static int32_t sysChkFilter__Comm(SNode* pNode);
|
static int32_t sysChkFilter__Comm(SNode* pNode);
|
||||||
static int32_t sysChkFilter__DBName(SNode* pNode);
|
static int32_t sysChkFilter__DBName(SNode* pNode);
|
||||||
static int32_t sysChkFilter__VgroupId(SNode* pNode);
|
static int32_t sysChkFilter__VgroupId(SNode* pNode);
|
||||||
|
@ -1772,3 +1779,166 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, const char* idstr) {
|
||||||
|
*rowLen = 0;
|
||||||
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
|
int32_t code = metaGetTableEntryByUid(&mr, uid);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
|
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
(*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
||||||
|
}
|
||||||
|
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||||
|
uint64_t suid = mr.me.ctbEntry.suid;
|
||||||
|
tDecoderClear(&mr.coder);
|
||||||
|
code = metaGetTableEntryByUid(&mr, suid);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
(*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
||||||
|
}
|
||||||
|
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
|
||||||
|
int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
(*rowLen) += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SBlockDistInfo* pBlockScanInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
|
||||||
|
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid,
|
||||||
|
(int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
|
||||||
|
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
|
||||||
|
|
||||||
|
int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
|
||||||
|
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
||||||
|
tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
||||||
|
varDataSetLen(p, len);
|
||||||
|
|
||||||
|
colDataAppend(pColInfo, 0, p, false);
|
||||||
|
taosMemoryFree(p);
|
||||||
|
|
||||||
|
pBlock->info.rows = 1;
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void destroyBlockDistScanOperatorInfo(void* param) {
|
||||||
|
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
|
||||||
|
blockDataDestroy(pDistInfo->pResBlock);
|
||||||
|
tsdbReaderClose(pDistInfo->pHandle);
|
||||||
|
taosMemoryFreeClear(param);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
|
||||||
|
memset(pCond, 0, sizeof(SQueryTableDataCond));
|
||||||
|
|
||||||
|
pCond->order = TSDB_ORDER_ASC;
|
||||||
|
pCond->numOfCols = 1;
|
||||||
|
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
|
||||||
|
if (pCond->colList == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCond->colList->colId = 1;
|
||||||
|
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
pCond->colList->bytes = sizeof(TSKEY);
|
||||||
|
|
||||||
|
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
|
pCond->suid = uid;
|
||||||
|
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
|
||||||
|
pCond->startVersion = -1;
|
||||||
|
pCond->endVersion = -1;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SQueryTableDataCond cond = {0};
|
||||||
|
|
||||||
|
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
|
||||||
|
size_t num = tableListGetSize(pTableListInfo);
|
||||||
|
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||||
|
|
||||||
|
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
|
||||||
|
cleanupQueryTableDataCond(&cond);
|
||||||
|
if (code != 0) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->readHandle = *readHandle;
|
||||||
|
pInfo->uid = pBlockScanNode->suid;
|
||||||
|
|
||||||
|
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
|
||||||
|
blockDataEnsureCapacity(pInfo->pResBlock, 1);
|
||||||
|
|
||||||
|
int32_t numOfCols = 0;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
||||||
|
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
||||||
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
Loading…
Reference in New Issue