enh(stream): refine tqRetrieveDataBlock api
This commit is contained in:
parent
996f995644
commit
0c1a51bad6
|
@ -617,12 +617,12 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
|
||||||
.requestId = pRequest->requestId,
|
.requestId = pRequest->requestId,
|
||||||
.requestObjRefId = pRequest->self};
|
.requestObjRefId = pRequest->self};
|
||||||
SSchedulerReq req = {.pConn = &conn,
|
SSchedulerReq req = {.pConn = &conn,
|
||||||
.pNodeList = pNodeList,
|
.pNodeList = pNodeList,
|
||||||
.pDag = pDag,
|
.pDag = pDag,
|
||||||
.sql = pRequest->sqlstr,
|
.sql = pRequest->sqlstr,
|
||||||
.startTs = pRequest->metric.start,
|
.startTs = pRequest->metric.start,
|
||||||
.fp = schdExecCallback,
|
.fp = schdExecCallback,
|
||||||
.cbParam = &res};
|
.cbParam = &res};
|
||||||
|
|
||||||
int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
|
int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
|
||||||
|
|
||||||
|
@ -669,13 +669,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
.requestId = pRequest->requestId,
|
.requestId = pRequest->requestId,
|
||||||
.requestObjRefId = pRequest->self};
|
.requestObjRefId = pRequest->self};
|
||||||
SSchedulerReq req = {.pConn = &conn,
|
SSchedulerReq req = {.pConn = &conn,
|
||||||
.pNodeList = pNodeList,
|
.pNodeList = pNodeList,
|
||||||
.pDag = pDag,
|
.pDag = pDag,
|
||||||
.sql = pRequest->sqlstr,
|
.sql = pRequest->sqlstr,
|
||||||
.startTs = pRequest->metric.start,
|
.startTs = pRequest->metric.start,
|
||||||
.fp = NULL,
|
.fp = NULL,
|
||||||
.cbParam = NULL,
|
.cbParam = NULL,
|
||||||
.reqKilled = &pRequest->killed};
|
.reqKilled = &pRequest->killed};
|
||||||
|
|
||||||
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
|
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
|
||||||
pRequest->body.resInfo.execRes = res.res;
|
pRequest->body.resInfo.execRes = res.res;
|
||||||
|
|
|
@ -199,10 +199,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
||||||
return pResInfo->userFields;
|
return pResInfo->userFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); }
|
||||||
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
|
||||||
return taosQueryImpl(taos, sql, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
if (res == NULL) {
|
if (res == NULL) {
|
||||||
|
@ -593,11 +590,11 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
||||||
return pResInfo->pCol[columnIndex].offset;
|
return pResInfo->pCol[columnIndex].offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_validate_sql(TAOS *taos, const char *sql) {
|
int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
TAOS_RES* pObj = taosQueryImpl(taos, sql, true);
|
TAOS_RES *pObj = taosQueryImpl(taos, sql, true);
|
||||||
|
|
||||||
int code = taos_errno(pObj);
|
int code = taos_errno(pObj);
|
||||||
|
|
||||||
taos_free_result(pObj);
|
taos_free_result(pObj);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -884,10 +881,10 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
|
||||||
|
|
||||||
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
|
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRequestObj *pRequest = NULL;
|
SRequestObj *pRequest = NULL;
|
||||||
SCatalogReq catalogReq = {0};
|
SCatalogReq catalogReq = {0};
|
||||||
|
|
||||||
if (NULL == tableNameList) {
|
if (NULL == tableNameList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -911,26 +908,25 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCatalog* pCtg = NULL;
|
SCatalog *pCtg = NULL;
|
||||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
|
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* sql = "taos_load_table_info";
|
char *sql = "taos_load_table_info";
|
||||||
code = buildRequest(pTscObj, sql, strlen(sql), &pRequest);
|
code = buildRequest(pTscObj, sql, strlen(sql), &pRequest);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncQueryParam param = {0};
|
SSyncQueryParam param = {0};
|
||||||
tsem_init(¶m.sem, 0, 0);
|
tsem_init(¶m.sem, 0, 0);
|
||||||
param.pRequest = pRequest;
|
param.pRequest = pRequest;
|
||||||
|
|
||||||
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
|
SRequestConnInfo conn = {
|
||||||
.requestId = pRequest->requestId,
|
.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
|
||||||
.requestObjRefId = pRequest->self};
|
|
||||||
|
|
||||||
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
@ -951,7 +947,6 @@ _return:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
||||||
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
|
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
|
||||||
if (NULL == pObj) {
|
if (NULL == pObj) {
|
||||||
|
|
|
@ -1164,7 +1164,7 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
|
||||||
|
|
||||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
//ASSERT(numOfRows > 0);
|
// ASSERT(numOfRows > 0);
|
||||||
|
|
||||||
if (numOfRows == 0) {
|
if (numOfRows == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1657,12 +1657,13 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
|
||||||
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
|
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
|
||||||
int32_t size = 2048;
|
int32_t size = 2048;
|
||||||
*pDataBuf = taosMemoryCalloc(size, 1);
|
*pDataBuf = taosMemoryCalloc(size, 1);
|
||||||
char* dumpBuf = *pDataBuf;
|
char* dumpBuf = *pDataBuf;
|
||||||
char pBuf[128] = {0};
|
char pBuf[128] = {0};
|
||||||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
|
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag,
|
||||||
|
(int32_t)pDataBlock->info.type, pDataBlock->info.childId);
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
|
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
|
||||||
for (int32_t k = 0; k < colNum; k++) {
|
for (int32_t k = 0; k < colNum; k++) {
|
||||||
|
|
|
@ -116,18 +116,18 @@ typedef void *tsdbReaderT;
|
||||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||||
|
|
||||||
int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList);
|
int32_t tsdbSetTableList(tsdbReaderT reader, SArray *tableList);
|
||||||
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId,
|
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId,
|
||||||
uint64_t taskId);
|
uint64_t taskId);
|
||||||
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
|
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
|
||||||
void *pMemRef);
|
void *pMemRef);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
||||||
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
|
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
|
||||||
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
|
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
void *tsdbGetIdx(SMeta *pMeta);
|
||||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
|
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
|
||||||
|
|
||||||
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
||||||
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
||||||
|
@ -150,8 +150,7 @@ int32_t tqReadHandleRemoveTbUidList(STqReadHandle *pHandle, const SArray *tbUidL
|
||||||
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||||
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
|
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReadHandle *pHandle);
|
||||||
int32_t *pNumOfRows);
|
|
||||||
|
|
||||||
// sma
|
// sma
|
||||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
|
|
|
@ -112,7 +112,7 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR
|
||||||
tqReadHandleSetMsg(pReader, pReq, 0);
|
tqReadHandleSetMsg(pReader, pReq, 0);
|
||||||
while (tqNextDataBlock(pReader)) {
|
while (tqNextDataBlock(pReader)) {
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
if (tqRetrieveDataBlock(&block, pReader, &block.info.groupId, &block.info.uid, &block.info.rows) < 0) {
|
if (tqRetrieveDataBlock(&block, pReader) < 0) {
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -129,7 +129,7 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR
|
||||||
tqReadHandleSetMsg(pReader, pReq, 0);
|
tqReadHandleSetMsg(pReader, pReq, 0);
|
||||||
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
|
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
if (tqRetrieveDataBlock(&block, pReader, &block.info.groupId, &block.info.uid, &block.info.rows) < 0) {
|
if (tqRetrieveDataBlock(&block, pReader) < 0) {
|
||||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,10 +146,7 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid,
|
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle) {
|
||||||
int32_t* pNumOfRows) {
|
|
||||||
*pUid = 0;
|
|
||||||
|
|
||||||
// TODO: cache multiple schema
|
// TODO: cache multiple schema
|
||||||
int32_t sversion = htonl(pHandle->pBlock->sversion);
|
int32_t sversion = htonl(pHandle->pBlock->sversion);
|
||||||
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
|
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
|
||||||
|
@ -180,7 +177,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
|
||||||
STSchema* pTschema = pHandle->pSchema;
|
STSchema* pTschema = pHandle->pSchema;
|
||||||
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
|
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
|
||||||
|
|
||||||
*pNumOfRows = pHandle->msgIter.numOfRows;
|
|
||||||
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
||||||
|
|
||||||
if (colNumNeed == 0) {
|
if (colNumNeed == 0) {
|
||||||
|
@ -221,22 +217,22 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (blockDataEnsureCapacity(pBlock, *pNumOfRows) < 0) {
|
if (blockDataEnsureCapacity(pBlock, pHandle->msgIter.numOfRows) < 0) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t colActual = blockDataGetNumOfCols(pBlock);
|
int32_t colActual = blockDataGetNumOfCols(pBlock);
|
||||||
|
|
||||||
// TODO in stream shuffle case, fetch groupId
|
|
||||||
*pGroupId = 0;
|
|
||||||
|
|
||||||
STSRowIter iter = {0};
|
STSRowIter iter = {0};
|
||||||
tdSTSRowIterInit(&iter, pTschema);
|
tdSTSRowIterInit(&iter, pTschema);
|
||||||
STSRow* row;
|
STSRow* row;
|
||||||
int32_t curRow = 0;
|
int32_t curRow = 0;
|
||||||
|
|
||||||
tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
|
tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
|
||||||
*pUid = pHandle->msgIter.uid; // set the uid of table for submit block
|
|
||||||
|
pBlock->info.groupId = 0;
|
||||||
|
pBlock->info.uid = pHandle->msgIter.uid; // set the uid of table for submit block
|
||||||
|
pBlock->info.rows = pHandle->msgIter.numOfRows;
|
||||||
|
|
||||||
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||||
tdSTSRowIterReset(&iter, row);
|
tdSTSRowIterReset(&iter, row);
|
||||||
|
|
|
@ -507,20 +507,21 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pInfo = pOperator->info;
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
if(pInfo->currentGroupId == -1){
|
if (pInfo->currentGroupId == -1) {
|
||||||
pInfo->currentGroupId++;
|
pInfo->currentGroupId++;
|
||||||
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
||||||
tsdbCleanupReadHandle(pInfo->dataReader);
|
tsdbCleanupReadHandle(pInfo->dataReader);
|
||||||
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
|
tsdbReaderT* pReader =
|
||||||
|
tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
|
||||||
pInfo->dataReader = pReader;
|
pInfo->dataReader = pReader;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||||
if(result){
|
if (result) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -530,7 +531,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
||||||
tsdbSetTableList(pInfo->dataReader, tableList);
|
tsdbSetTableList(pInfo->dataReader, tableList);
|
||||||
|
|
||||||
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
||||||
|
@ -538,7 +539,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
|
|
||||||
result = doTableScanGroup(pOperator);
|
result = doTableScanGroup(pOperator);
|
||||||
if(result){
|
if (result) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -777,9 +778,9 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||||
STimeWindow win = {
|
STimeWindow win = {
|
||||||
.skey = INT64_MIN,
|
.skey = INT64_MIN,
|
||||||
.ekey = INT64_MAX,
|
.ekey = INT64_MAX,
|
||||||
};
|
};
|
||||||
bool needRead = false;
|
bool needRead = false;
|
||||||
if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
|
if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
|
||||||
|
@ -794,13 +795,12 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
|
||||||
SResultWindowInfo* pCurWin =
|
SResultWindowInfo* pCurWin =
|
||||||
getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
|
getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
|
||||||
win = pCurWin->win;
|
win = pCurWin->win;
|
||||||
(*pRowIndex) +=
|
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
|
||||||
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
|
|
||||||
} else {
|
} else {
|
||||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval,
|
win =
|
||||||
pInfo->interval.precision, NULL);
|
getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
|
||||||
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey,
|
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
|
||||||
binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
TSDB_ORDER_ASC);
|
||||||
}
|
}
|
||||||
needRead = true;
|
needRead = true;
|
||||||
} else if (isStateWindow(pInfo)) {
|
} else if (isStateWindow(pInfo)) {
|
||||||
|
@ -821,7 +821,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
|
||||||
pTableScanInfo->cond.twindows[0] = win;
|
pTableScanInfo->cond.twindows[0] = win;
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
pTableScanInfo->curTWinIdx = 0;
|
||||||
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
// if (!pTableScanInfo->dataReader) {
|
// if (!pTableScanInfo->dataReader) {
|
||||||
// return false;
|
// return false;
|
||||||
// }
|
// }
|
||||||
|
@ -1033,12 +1033,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while (tqNextDataBlock(pInfo->streamBlockReader)) {
|
while (tqNextDataBlock(pInfo->streamBlockReader)) {
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
uint64_t groupId = 0;
|
|
||||||
uint64_t uid = 0;
|
|
||||||
int32_t numOfRows = 0;
|
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader, &groupId, &uid, &numOfRows);
|
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader);
|
||||||
|
|
||||||
|
uint64_t groupId = block.info.groupId;
|
||||||
|
uint64_t uid = block.info.uid;
|
||||||
|
int32_t numOfRows = block.info.rows;
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
|
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
@ -1154,9 +1155,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
||||||
return tableIdList;
|
return tableIdList;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
|
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
||||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
|
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
|
||||||
STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) {
|
uint64_t taskId) {
|
||||||
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
|
||||||
|
@ -1743,14 +1744,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
||||||
|
|
||||||
pInfo->accountId = pScanPhyNode->accountId;
|
pInfo->accountId = pScanPhyNode->accountId;
|
||||||
pInfo->pUser = taosMemoryStrDup((void*) pUser);
|
pInfo->pUser = taosMemoryStrDup((void*)pUser);
|
||||||
pInfo->showRewrite = pScanPhyNode->showRewrite;
|
pInfo->showRewrite = pScanPhyNode->showRewrite;
|
||||||
pInfo->pRes = pResBlock;
|
pInfo->pRes = pResBlock;
|
||||||
pInfo->pCondition = pScanNode->node.pConditions;
|
pInfo->pCondition = pScanNode->node.pConditions;
|
||||||
pInfo->scanCols = colList;
|
pInfo->scanCols = colList;
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
||||||
|
@ -1766,13 +1767,13 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
||||||
pInfo->readHandle = *(SReadHandle*)readHandle;
|
pInfo->readHandle = *(SReadHandle*)readHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->name = "SysTableScanOperator";
|
pOperator->name = "SysTableScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
|
||||||
|
@ -1959,11 +1960,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pTableList = pTableListInfo;
|
pInfo->pTableList = pTableListInfo;
|
||||||
pInfo->pColMatchInfo = colList;
|
pInfo->pColMatchInfo = colList;
|
||||||
pInfo->pRes = createResDataBlock(pDescNode);
|
pInfo->pRes = createResDataBlock(pDescNode);
|
||||||
pInfo->readHandle = *pReadHandle;
|
pInfo->readHandle = *pReadHandle;
|
||||||
pInfo->curPos = 0;
|
pInfo->curPos = 0;
|
||||||
|
|
||||||
pOperator->name = "TagScanOperator";
|
pOperator->name = "TagScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||||
|
@ -2034,7 +2035,7 @@ typedef struct STableMergeScanInfo {
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time
|
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time
|
||||||
// window to check if current data block needs to be loaded.
|
// window to check if current data block needs to be loaded.
|
||||||
|
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
Loading…
Reference in New Issue