Merge pull request #10569 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
This commit is contained in:
Haojun Liao 2022-03-12 15:47:52 +08:00 committed by GitHub
commit 02981347e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 261 additions and 256 deletions

View File

@ -31,27 +31,27 @@ typedef void TAOS_SUB;
typedef void **TAOS_ROW;
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 15 // string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_JSON 17 // json
#define TSDB_DATA_TYPE_DECIMAL 18 // decimal
#define TSDB_DATA_TYPE_BLOB 19 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 20
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15 // json string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_DECIMAL 17 // decimal
#define TSDB_DATA_TYPE_BLOB 18 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
typedef enum {
TSDB_OPTION_LOCALE,

View File

@ -41,7 +41,6 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef struct SInputData {
const struct SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap;
} SInputData;
typedef struct SOutputData {

View File

@ -253,6 +253,11 @@ typedef struct STaskIdInfo {
char* str;
} STaskIdInfo;
typedef struct STaskBufInfo {
int32_t bufSize; // total available buffer size in bytes
int32_t remainBuf; // remain buffer size
} STaskBufInfo;
typedef struct SExecTaskInfo {
STaskIdInfo id;
char* content;
@ -264,7 +269,8 @@ typedef struct SExecTaskInfo {
uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string
jmp_buf env; //
jmp_buf env; // when error occurs, abort
STaskBufInfo bufInfo; // available buffer info this task
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
@ -307,9 +313,11 @@ typedef struct STaskRuntimeEnv {
} STaskRuntimeEnv;
enum {
OP_IN_EXECUTING = 1,
OP_RES_TO_RETURN = 2,
OP_EXEC_DONE = 3,
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_IN_EXECUTING = 0x3,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
};
typedef struct SOperatorInfo {
@ -322,12 +330,14 @@ typedef struct SOperatorInfo {
SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t openFn;
__optr_fn_t nextDataFn;
__optr_fn_t getNextFn;
__optr_fn_t cleanupFn;
__optr_close_fn_t closeFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
} SOperatorInfo;
typedef struct {
@ -378,9 +388,9 @@ typedef struct STaskParam {
} STaskParam;
enum {
DATA_NOT_READY = 0x1,
DATA_READY = 0x2,
DATA_EXHAUSTED = 0x3,
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
};
typedef struct SSourceDataInfo {
@ -639,12 +649,16 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
@ -674,10 +688,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doSLimit(void* param, bool* newgroup);
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
@ -691,7 +701,6 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo,
int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);

View File

@ -21,8 +21,6 @@
#include "tqueue.h"
#include "executorimpl.h"
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
typedef struct SDataDispatchBuf {
int32_t useSize;
int32_t allocSize;
@ -90,19 +88,6 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
data += pColRes->info.bytes * pInput->pData->info.rows;
}
}
int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL);
while (item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
item = taosHashIterate(pInput->pTableRetrieveTsMap, item);
}
}
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
@ -113,7 +98,7 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0;
pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
pBuf->useSize = sizeof(SRetrieveTableRsp);
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
if (0 == pEntry->compressed) {
pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows;
@ -130,7 +115,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false;
}
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows;
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows;
pBuf->pData = malloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));

View File

@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t st = 0;
st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup);
*pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;

View File

@ -211,6 +211,9 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
@ -221,6 +224,17 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
}
}
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
static int32_t operatorDummyOpenFn(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
@ -4723,6 +4737,11 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
// The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->pTsdbReadHandle == NULL) {
return NULL;
@ -4840,6 +4859,11 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
pBlockInfo->rows = 0;
@ -4880,7 +4904,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
pSourceDataInfo->status = DATA_READY;
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
}
@ -5008,12 +5032,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == DATA_EXHAUSTED) {
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
continue;
}
if (pDataInfo->status != DATA_READY) {
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
continue;
}
@ -5026,7 +5050,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
pExchangeInfo->totalRows);
pDataInfo->status = DATA_EXHAUSTED;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1;
continue;
}
@ -5042,15 +5066,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1,
totalSources);
pDataInfo->status = DATA_EXHAUSTED;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows,
pExchangeInfo->totalSize);
}
if (pDataInfo->status != DATA_EXHAUSTED) {
pDataInfo->status = DATA_NOT_READY;
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -5070,13 +5094,9 @@ _error:
return NULL;
}
static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) {
static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) {
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int64_t startTs = taosGetTimestampUs();
@ -5085,7 +5105,8 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) {
for(int32_t i = 0; i < totalSources; ++i) {
int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
pTaskInfo->code = code;
return code;
}
}
@ -5093,9 +5114,9 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) {
qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
pOperator->status = OP_RES_TO_RETURN;
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
@ -5123,7 +5144,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pExchangeInfo->totalRows);
pDataInfo->status = DATA_EXHAUSTED;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1;
continue;
}
@ -5138,7 +5159,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1,
totalSources);
pDataInfo->status = DATA_EXHAUSTED;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
@ -5149,14 +5170,39 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
}
}
static int32_t prepareLoadRemoteData(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
SExchangeInfo *pExchangeInfo = pOperator->info;
if (pExchangeInfo->seqLoadData) {
// do nothing for sequentially load data
} else {
int32_t code = prepareConcurrentlyLoad(pOperator);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int32_t code = pOperator->_openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
if (pOperator->status == OP_EXEC_DONE) {
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
@ -5164,11 +5210,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
}
*newgroup = false;
if (pExchangeInfo->seqLoadData) {
return seqLoadRemoteData(pOperator);
} else {
return concurrentlyLoadRemoteData(pOperator);
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
#if 0
@ -5181,16 +5226,35 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
#endif
}
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.pEx = pInfo;
dataInfo.index = i;
void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (ret == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
// TODO handle the error
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
goto _error;
}
size_t numOfSources = LIST_LENGTH(pSources);
@ -5217,29 +5281,28 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
return NULL;
}
for(int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = DATA_NOT_READY;
dataInfo.pEx = pInfo;
dataInfo.index = i;
taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
int32_t code = initDataSource(numOfSources, pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
size_t size = pBlock->info.numOfCols;
pInfo->pResult = pBlock;
pInfo->seqLoadData = true;
pInfo->seqLoadData = true; // sequentially load data from the source node
tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = size;
pOperator->nextDataFn = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function.
pOperator->getNextFn = doLoadRemoteData;
pOperator->closeFn = destroyExchangeOperatorInfo;
#if 1
{ // todo refactor
@ -5265,6 +5328,16 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
#endif
return pOperator;
_error:
if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, 0);
}
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
@ -5315,10 +5388,12 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->nextDataFn = doTableScan;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doTableScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
@ -5339,11 +5414,11 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doTableScanImpl;
pOperator->getNextFn = doTableScanImpl;
return pOperator;
}
@ -5364,10 +5439,10 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
pOperator->name = "TableBlockInfoScanOperator";
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
// pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->nextDataFn = doBlockInfoScan;
pOperator->getNextFn = doBlockInfoScan;
return pOperator;
}
@ -5397,10 +5472,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->nextDataFn = doStreamBlockScan;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doStreamBlockScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
@ -5415,7 +5493,7 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
pSourceDataInfo->status = DATA_READY;
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
}
@ -5513,7 +5591,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doSysTableScan;
pOperator->getNextFn = doSysTableScan;
pOperator->closeFn = destroySysTableScannerOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
@ -5741,7 +5819,7 @@ SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
bool newgroup = false;
return pOperator->nextDataFn(pOperator, &newgroup);
return pOperator->getNextFn(pOperator, &newgroup);
}
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) {
@ -6055,13 +6133,13 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->name = "SortedMerge";
// pOperator->operatorType = OP_SortedMerge;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pTaskInfo = pTaskInfo;
pOperator->nextDataFn = doSortedMerge;
pOperator->getNextFn = doSortedMerge;
pOperator->closeFn = destroySortedMergeOperatorInfo;
code = appendDownstream(pOperator, downstream, numOfDownstream);
@ -6153,11 +6231,11 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pOperator->name = "Order";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->nextDataFn = doSort;
pOperator->getNextFn = doSort;
pOperator->closeFn = destroyOrderOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -6183,7 +6261,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6233,7 +6311,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6319,7 +6397,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6377,7 +6455,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6428,7 +6506,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
SSDataBlock *pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6471,7 +6549,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6531,7 +6609,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6594,7 +6672,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6649,7 +6727,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6784,7 +6862,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -6846,7 +6924,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
@ -6899,7 +6977,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
@ -6984,7 +7062,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) {
@ -7141,13 +7219,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->pTaskInfo = pTaskInfo;
pOperator->nextDataFn = doAggregate;
pOperator->getNextFn = doAggregate;
pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7196,17 +7274,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevData);
}
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
@ -7236,6 +7314,17 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput)
}
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*) param;
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) {
blockDataDestroy(pExInfo->pResult);
}
tsem_destroy(&pExInfo->ready);
}
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
@ -7250,13 +7339,13 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
pOperator->name = "MultiTableAggregate";
// pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->nextDataFn = doMultiTableAggregate;
pOperator->getNextFn = doMultiTableAggregate;
pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7277,12 +7366,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp
pOperator->name = "ProjectOperator";
// pOperator->operatorType = OP_Project;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doProjectOperation;
pOperator->getNextFn = doProjectOperation;
pOperator->closeFn = destroyProjectOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7331,11 +7420,11 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->name = "LimitOperator";
// pOperator->operatorType = OP_Limit;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->nextDataFn = doLimit;
pOperator->status = OP_NOT_OPENED;
pOperator->getNextFn = doLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
int32_t code = appendDownstream(pOperator, &downstream, 1);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
@ -7363,13 +7452,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx
pOperator->name = "TimeIntervalAggOperator";
// pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->info = pInfo;
pOperator->nextDataFn = doIntervalAgg;
pOperator->getNextFn = doIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
@ -7388,12 +7477,12 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator->name = "AllTimeIntervalAggOperator";
// pOperator->operatorType = OP_AllTimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doAllIntervalAgg;
pOperator->getNextFn = doAllIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7412,12 +7501,12 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pOperator->name = "StateWindowOperator";
// pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doStateWindowAgg;
pOperator->getNextFn = doStateWindowAgg;
pOperator->closeFn = destroyStateWindowOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7437,12 +7526,12 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->name = "SessionWindowAggOperator";
// pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doSessionWindowAgg;
pOperator->getNextFn = doSessionWindowAgg;
pOperator->closeFn = destroySWindowOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7460,13 +7549,13 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pOperator->name = "MultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_MultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doSTableIntervalAgg;
pOperator->getNextFn = doSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7484,13 +7573,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->name = "AllMultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_AllMultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doAllSTableIntervalAgg;
pOperator->getNextFn = doAllSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7516,13 +7605,13 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = hashGroupbyAggregate;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7555,13 +7644,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
pOperator->name = "FillOperator";
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Fill;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doFill;
pOperator->getNextFn = doFill;
pOperator->closeFn = destroySFillOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7606,13 +7695,12 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->name = "SLimitOperator";
// pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
// pOperator->exec = doSLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->closeFn = destroySlimitOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
@ -7746,6 +7834,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
#endif
return 0;
}
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
@ -7762,9 +7851,9 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
pOperator->name = "SeqTableTagScan";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->nextDataFn = doTagScan;
pOperator->getNextFn = doTagScan;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
@ -7834,7 +7923,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
@ -7900,13 +7989,13 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "DistinctOperator";
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Distinct;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = hashDistinct;
pOperator->getNextFn = hashDistinct;
pOperator->pExpr = pExpr;
pOperator->closeFn = destroyDistinctOperatorInfo;
@ -8500,75 +8589,6 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) {
pResultInfo->total = 0;
}
FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId;
}
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger) {
int32_t code = TSDB_CODE_SUCCESS;
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->qinfo = pQInfo;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STSBuf *pTsBuf = NULL;
if (pTsBufInfo->tsLen > 0) { // open new file to save the result
char* tsBlock = start + pTsBufInfo->tsOffset;
pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder,
pQueryAttr->vgId);
if (pTsBuf == NULL) {
code = TSDB_CODE_QRY_NO_DISKSPACE;
goto _error;
}
tsBufResetPos(pTsBuf);
bool ret = tsBufNextPos(pTsBuf);
UNUSED(ret);
}
SArray* prevResult = NULL;
if (prevResultLen > 0) {
prevResult = interResFromBinary(param->prevResult, prevResultLen);
pRuntimeEnv->prevResult = prevResult;
}
pRuntimeEnv->currentOffset = pQueryAttr->limit.offset;
if (tsdb != NULL) {
// pQueryAttr->precision = tsdbGetCfg(tsdb)->precision;
}
if ((QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.skey > pQueryAttr->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) {
//qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey,
// pQueryAttr->window.ekey, pQueryAttr->order.order);
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0;
// todo free memory
return TSDB_CODE_SUCCESS;
}
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
//qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId);
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS;
}
// filter the qualified
if ((code = doInitQInfo(pQInfo, pTsBuf, tsdb, sourceOptr, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) {
goto _error;
}
return code;
_error:
// table query ref will be decrease during error handling
// doDestroyTask(pQInfo);
return code;
}
//TODO refactor
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
if (pFilter == NULL || numOfFilters == 0) {

View File

@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
pOperator->name = "dummyInputOpertor4Test";
if (numOfCols == 1) {
pOperator->nextDataFn = getDummyBlock;
pOperator->getNextFn = getDummyBlock;
} else {
pOperator->nextDataFn = get2ColsDummyBlock;
pOperator->getNextFn = get2ColsDummyBlock;
}
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
@ -971,7 +971,7 @@ TEST(testCase, inMem_sort_Test) {
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false;
SSDataBlock* pRes = pOperator->nextDataFn(pOperator, &newgroup);
SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup);
SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
@ -1049,7 +1049,7 @@ TEST(testCase, external_sort_Test) {
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup);
pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {
@ -1121,7 +1121,7 @@ TEST(testCase, sorted_merge_Test) {
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup);
pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {
@ -1199,7 +1199,7 @@ TEST(testCase, time_interval_Operator_Test) {
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup);
pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {

View File

@ -131,7 +131,6 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
case TSDB_DATA_TYPE_DOUBLE:
COPY_SCALAR_FIELD(datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:

View File

@ -978,7 +978,6 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
@ -1042,7 +1041,6 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:

View File

@ -386,7 +386,6 @@ void* nodesGetValueFromNode(SValueNode *pNode) {
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
return (void*)&pNode->datum.d;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:

View File

@ -287,7 +287,6 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
pVal->datum.d = strtold(pVal->literal, &endPtr);
break;
}
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
@ -601,7 +600,6 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool
static int32_t getPositionValue(const SValueNode* pVal) {
switch (pVal->node.resType.type) {
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
@ -1438,7 +1436,6 @@ static void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
case TSDB_DATA_TYPE_DOUBLE:
pVal->d = pNode->datum.d;
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:

View File

@ -56,7 +56,7 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicPlan = nullptr;
SPlanContext cxt = { .queryId = 1, .pAstRoot = query_->pRoot };
SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot };
code = createLogicPlan(&cxt, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;

View File

@ -495,7 +495,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
ASSERT(pRes->info.rows > 0);
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
SInputData inputData = {.pData = pRes};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code));

View File

@ -411,27 +411,26 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
}
int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
/* NULL BOOL TINY SMAL INT BIG FLOA DOUB BINA TIME NCHA UTIN USMA UINT UBIG VARC VARB JSON DECI BLOB */
/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BOOL*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 0, 12, 13, 14, 7, 7, 0, 0, 0,
/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 7, 0, 0, 0,
/*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 7, 0, 0, 0,
/*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 7, 9, 7, 4, 4, 5, 7, 7, 7, 0, 0, 0,
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 7, 7, 0, 0, 0,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 7, 7, 0, 0, 0,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0,
/*BINA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 7, 7, 0, 0, 0,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0,
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 7, 7, 0, 0, 0,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 7, 7, 0, 0, 0,
/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 7, 7, 0, 0, 0,
/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 0, 0, 0,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
/* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG VARB JSON DECI BLOB */
/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BOOL*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 0, 12, 13, 14, 7, 0, 0, 0,
/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 0, 0, 0,
/*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 0, 0, 0,
/*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 7, 9, 7, 4, 4, 5, 7, 7, 0, 0, 0,
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 7, 0, 0, 0,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 7, 0, 0, 0,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 7, 0, 0, 0,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 7, 0, 0, 0,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 7, 0, 0, 0,
/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 7, 0, 0, 0,
/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0,
/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
};
int32_t vectorGetConvertType(int32_t type1, int32_t type2) {