diff --git a/include/client/taos.h b/include/client/taos.h index e1e85ac3a4..66e3c491c7 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -31,27 +31,27 @@ typedef void TAOS_SUB; typedef void **TAOS_ROW; // Data type definition -#define TSDB_DATA_TYPE_NULL 0 // 1 bytes -#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes -#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte -#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes -#define TSDB_DATA_TYPE_INT 4 // 4 bytes -#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes -#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes -#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes -#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar -#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes -#define TSDB_DATA_TYPE_NCHAR 10 // unicode string -#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte -#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes -#define TSDB_DATA_TYPE_UINT 13 // 4 bytes -#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes -#define TSDB_DATA_TYPE_VARCHAR 15 // string -#define TSDB_DATA_TYPE_VARBINARY 16 // binary -#define TSDB_DATA_TYPE_JSON 17 // json -#define TSDB_DATA_TYPE_DECIMAL 18 // decimal -#define TSDB_DATA_TYPE_BLOB 19 // binary -#define TSDB_DATA_TYPE_MEDIUMBLOB 20 +#define TSDB_DATA_TYPE_NULL 0 // 1 bytes +#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes +#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte +#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes +#define TSDB_DATA_TYPE_INT 4 // 4 bytes +#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes +#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes +#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes +#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar +#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes +#define TSDB_DATA_TYPE_NCHAR 10 // unicode string +#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte +#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes +#define TSDB_DATA_TYPE_UINT 13 // 4 bytes +#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes +#define TSDB_DATA_TYPE_JSON 15 // json string +#define TSDB_DATA_TYPE_VARBINARY 16 // binary +#define TSDB_DATA_TYPE_DECIMAL 17 // decimal +#define TSDB_DATA_TYPE_BLOB 18 // binary +#define TSDB_DATA_TYPE_MEDIUMBLOB 19 +#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string typedef enum { TSDB_OPTION_LOCALE, diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index ec15291700..293e7ae52f 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -41,7 +41,6 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); typedef struct SInputData { const struct SSDataBlock* pData; - SHashObj* pTableRetrieveTsMap; } SInputData; typedef struct SOutputData { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e95457b91e..ee841e3ce9 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -253,6 +253,11 @@ typedef struct STaskIdInfo { char* str; } STaskIdInfo; +typedef struct STaskBufInfo { + int32_t bufSize; // total available buffer size in bytes + int32_t remainBuf; // remain buffer size +} STaskBufInfo; + typedef struct SExecTaskInfo { STaskIdInfo id; char* content; @@ -264,7 +269,8 @@ typedef struct SExecTaskInfo { uint64_t totalRows; // total number of rows STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure char* sql; // query sql string - jmp_buf env; // + jmp_buf env; // when error occurs, abort + STaskBufInfo bufInfo; // available buffer info this task struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -307,9 +313,11 @@ typedef struct STaskRuntimeEnv { } STaskRuntimeEnv; enum { - OP_IN_EXECUTING = 1, - OP_RES_TO_RETURN = 2, - OP_EXEC_DONE = 3, + OP_NOT_OPENED = 0x0, + OP_OPENED = 0x1, + OP_IN_EXECUTING = 0x3, + OP_RES_TO_RETURN = 0x5, + OP_EXEC_DONE = 0x9, }; typedef struct SOperatorInfo { @@ -322,12 +330,14 @@ typedef struct SOperatorInfo { SExprInfo* pExpr; STaskRuntimeEnv* pRuntimeEnv; // todo remove it SExecTaskInfo* pTaskInfo; + SOperatorCostInfo cost; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - __optr_open_fn_t openFn; - __optr_fn_t nextDataFn; + __optr_fn_t getNextFn; + __optr_fn_t cleanupFn; __optr_close_fn_t closeFn; + __optr_open_fn_t _openFn; // DO NOT invoke this function directly } SOperatorInfo; typedef struct { @@ -378,9 +388,9 @@ typedef struct STaskParam { } STaskParam; enum { - DATA_NOT_READY = 0x1, - DATA_READY = 0x2, - DATA_EXHAUSTED = 0x3, + EX_SOURCE_DATA_NOT_READY = 0x1, + EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_EXHAUSTED = 0x3, }; typedef struct SSourceDataInfo { @@ -639,12 +649,16 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); + SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, @@ -674,10 +688,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); - -// SSDataBlock* doSLimit(void* param, bool* newgroup); // int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); @@ -691,7 +701,6 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 9c86e19166..a2e526c2bd 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -21,8 +21,6 @@ #include "tqueue.h" #include "executorimpl.h" -#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp)) - typedef struct SDataDispatchBuf { int32_t useSize; int32_t allocSize; @@ -90,19 +88,6 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema data += pColRes->info.bytes * pInput->pData->info.rows; } } - - int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap); - *(int32_t*)data = htonl(numOfTables); - data += sizeof(int32_t); - - STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL); - while (item) { - STableIdInfo* pDst = (STableIdInfo*)data; - pDst->uid = htobe64(item->uid); - pDst->key = htobe64(item->key); - data += sizeof(STableIdInfo); - item = taosHashIterate(pInput->pTableRetrieveTsMap, item); - } } // data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... @@ -113,7 +98,7 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat pEntry->numOfRows = pInput->pData->info.rows; pEntry->dataLen = 0; - pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap); + pBuf->useSize = sizeof(SRetrieveTableRsp); copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); if (0 == pEntry->compressed) { pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows; @@ -130,7 +115,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows; + pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows; pBuf->pData = malloc(pBuf->allocSize); if (pBuf->pData == NULL) { qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index fabaa2d31d..7e55a4b3e1 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int64_t st = 0; st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup); + *pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9b519f9004..8b0b46dbbd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -211,6 +211,9 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); +static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); +static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); + static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); @@ -221,6 +224,17 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } +#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) +#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) + +static int32_t operatorDummyOpenFn(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + OPTR_SET_OPENED(pOperator); + return TSDB_CODE_SUCCESS; +} + +static void operatorDummyCloseFn(void* param, int32_t numOfCols) {} + static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); @@ -4723,6 +4737,11 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { STableScanInfo *pTableScanInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + // The read handle is not initialized yet, since no qualified tables exists if (pTableScanInfo->pTsdbReadHandle == NULL) { return NULL; @@ -4840,6 +4859,11 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; pBlockInfo->rows = 0; @@ -4880,7 +4904,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) pRsp->useconds = htobe64(pRsp->useconds); pRsp->compLen = htonl(pRsp->compLen); - pSourceDataInfo->status = DATA_READY; + pSourceDataInfo->status = EX_SOURCE_DATA_READY; tsem_post(&pSourceDataInfo->pEx->ready); } @@ -5008,12 +5032,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); - if (pDataInfo->status == DATA_EXHAUSTED) { + if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { completed += 1; continue; } - if (pDataInfo->status != DATA_READY) { + if (pDataInfo->status != EX_SOURCE_DATA_READY) { continue; } @@ -5026,7 +5050,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, pExchangeInfo->totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; continue; } @@ -5042,15 +5066,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1, totalSources); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize); } - if (pDataInfo->status != DATA_EXHAUSTED) { - pDataInfo->status = DATA_NOT_READY; + if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { + pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5070,13 +5094,9 @@ _error: return NULL; } -static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { +static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) { SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - - if (pOperator->status == OP_RES_TO_RETURN) { - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); - } size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); int64_t startTs = taosGetTimestampUs(); @@ -5085,7 +5105,8 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { for(int32_t i = 0; i < totalSources; ++i) { int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { - return NULL; + pTaskInfo->code = code; + return code; } } @@ -5093,9 +5114,9 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs); tsem_wait(&pExchangeInfo->ready); + pOperator->cost.openCost = taosGetTimestampUs() - startTs; - pOperator->status = OP_RES_TO_RETURN; - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); + return TSDB_CODE_SUCCESS; } static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { @@ -5123,7 +5144,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, pDataInfo->totalRows, pExchangeInfo->totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; continue; } @@ -5138,7 +5159,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1, totalSources); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, @@ -5149,14 +5170,39 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { } } +static int32_t prepareLoadRemoteData(void* param) { + SOperatorInfo *pOperator = (SOperatorInfo*) param; + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + SExchangeInfo *pExchangeInfo = pOperator->info; + if (pExchangeInfo->seqLoadData) { + // do nothing for sequentially load data + } else { + int32_t code = prepareConcurrentlyLoad(pOperator); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + OPTR_SET_OPENED(pOperator); + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo*) param; SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + int32_t code = pOperator->_openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; + } + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); @@ -5164,11 +5210,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { } *newgroup = false; - if (pExchangeInfo->seqLoadData) { return seqLoadRemoteData(pOperator); } else { - return concurrentlyLoadRemoteData(pOperator); + return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); } #if 0 @@ -5181,16 +5226,35 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { #endif } +static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { + pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); + if (pInfo->pSourceDataInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for(int32_t i = 0; i < numOfSources; ++i) { + SSourceDataInfo dataInfo = {0}; + dataInfo.status = EX_SOURCE_DATA_NOT_READY; + dataInfo.pEx = pInfo; + dataInfo.index = i; + + void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + if (ret == NULL) { + taosArrayDestroy(pInfo->pSourceDataInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + // TODO handle the error SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - tfree(pInfo); - tfree(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } size_t numOfSources = LIST_LENGTH(pSources); @@ -5217,29 +5281,28 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock return NULL; } - for(int32_t i = 0; i < numOfSources; ++i) { - SSourceDataInfo dataInfo = {0}; - dataInfo.status = DATA_NOT_READY; - dataInfo.pEx = pInfo; - dataInfo.index = i; - - taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + int32_t code = initDataSource(numOfSources, pInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } size_t size = pBlock->info.numOfCols; pInfo->pResult = pBlock; pInfo->seqLoadData = true; + pInfo->seqLoadData = true; // sequentially load data from the source node tsem_init(&pInfo->ready, 0, 0); pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = size; - pOperator->nextDataFn = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; + pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function. + pOperator->getNextFn = doLoadRemoteData; + pOperator->closeFn = destroyExchangeOperatorInfo; #if 1 { // todo refactor @@ -5265,6 +5328,16 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock #endif return pOperator; + + _error: + if (pInfo != NULL) { + destroyExchangeOperatorInfo(pInfo, 0); + } + + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; } SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { @@ -5315,10 +5388,12 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->nextDataFn = doTableScan; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doTableScan; + pOperator->closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -5339,11 +5414,11 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doTableScanImpl; + pOperator->getNextFn = doTableScanImpl; return pOperator; } @@ -5364,10 +5439,10 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt pOperator->name = "TableBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; // pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->nextDataFn = doBlockInfoScan; + pOperator->getNextFn = doBlockInfoScan; return pOperator; } @@ -5397,10 +5472,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->nextDataFn = doStreamBlockScan; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doStreamBlockScan; + pOperator->closeFn = operatorDummyCloseFn; + pOperator->pTaskInfo = pTaskInfo; return pOperator; } @@ -5415,7 +5493,7 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t pRsp->useconds = htobe64(pRsp->useconds); pRsp->compLen = htonl(pRsp->compLen); - pSourceDataInfo->status = DATA_READY; + pSourceDataInfo->status = EX_SOURCE_DATA_READY; tsem_post(&pSourceDataInfo->pEx->ready); } @@ -5513,7 +5591,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = taosArrayGetSize(pExprInfo); - pOperator->nextDataFn = doSysTableScan; + pOperator->getNextFn = doSysTableScan; pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->pTaskInfo = pTaskInfo; @@ -5741,7 +5819,7 @@ SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; bool newgroup = false; - return pOperator->nextDataFn(pOperator, &newgroup); + return pOperator->getNextFn(pOperator, &newgroup); } static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) { @@ -6055,13 +6133,13 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->name = "SortedMerge"; // pOperator->operatorType = OP_SortedMerge; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->nextDataFn = doSortedMerge; + pOperator->getNextFn = doSortedMerge; pOperator->closeFn = destroySortedMergeOperatorInfo; code = appendDownstream(pOperator, downstream, numOfDownstream); @@ -6153,11 +6231,11 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI pOperator->name = "Order"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->nextDataFn = doSort; + pOperator->getNextFn = doSort; pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -6183,7 +6261,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6233,7 +6311,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6319,7 +6397,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // The downstream exec may change the value of the newgroup, so use a local variable instead. publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6377,7 +6455,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while (1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6428,7 +6506,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { while (1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock *pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + SSDataBlock *pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6471,7 +6549,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6531,7 +6609,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6594,7 +6672,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6649,7 +6727,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6784,7 +6862,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6846,7 +6924,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -6899,7 +6977,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -6984,7 +7062,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { @@ -7141,13 +7219,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->nextDataFn = doAggregate; + pOperator->getNextFn = doAggregate; pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7196,17 +7274,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevData); } -static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { +void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { +void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; pInfo->pRes = blockDataDestroy(pInfo->pRes); } -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { +void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); @@ -7236,6 +7314,17 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) } } +void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { + SExchangeInfo* pExInfo = (SExchangeInfo*) param; + taosArrayDestroy(pExInfo->pSources); + taosArrayDestroy(pExInfo->pSourceDataInfo); + if (pExInfo->pResult != NULL) { + blockDataDestroy(pExInfo->pResult); + } + + tsem_destroy(&pExInfo->ready); +} + SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -7250,13 +7339,13 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray pOperator->name = "MultiTableAggregate"; // pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = numOfOutput; pOperator->pTaskInfo = pTaskInfo; - pOperator->nextDataFn = doMultiTableAggregate; + pOperator->getNextFn = doMultiTableAggregate; pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7277,12 +7366,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp pOperator->name = "ProjectOperator"; // pOperator->operatorType = OP_Project; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo); - pOperator->nextDataFn = doProjectOperation; + pOperator->getNextFn = doProjectOperation; pOperator->closeFn = destroyProjectOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7331,11 +7420,11 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->name = "LimitOperator"; // pOperator->operatorType = OP_Limit; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->nextDataFn = doLimit; + pOperator->status = OP_NOT_OPENED; + pOperator->getNextFn = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - int32_t code = appendDownstream(pOperator, &downstream, 1); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7363,13 +7452,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx pOperator->name = "TimeIntervalAggOperator"; // pOperator->operatorType = OP_TimeWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->info = pInfo; - pOperator->nextDataFn = doIntervalAgg; + pOperator->getNextFn = doIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -7388,12 +7477,12 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->name = "AllTimeIntervalAggOperator"; // pOperator->operatorType = OP_AllTimeWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doAllIntervalAgg; + pOperator->getNextFn = doAllIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7412,12 +7501,12 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->name = "StateWindowOperator"; // pOperator->operatorType = OP_StateWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doStateWindowAgg; + pOperator->getNextFn = doStateWindowAgg; pOperator->closeFn = destroyStateWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7437,12 +7526,12 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->name = "SessionWindowAggOperator"; // pOperator->operatorType = OP_SessionWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doSessionWindowAgg; + pOperator->getNextFn = doSessionWindowAgg; pOperator->closeFn = destroySWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7460,13 +7549,13 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pOperator->name = "MultiTableTimeIntervalOperator"; // pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doSTableIntervalAgg; + pOperator->getNextFn = doSTableIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7484,13 +7573,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pOperator->name = "AllMultiTableTimeIntervalOperator"; // pOperator->operatorType = OP_AllMultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doAllSTableIntervalAgg; + pOperator->getNextFn = doAllSTableIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7516,13 +7605,13 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = hashGroupbyAggregate; + pOperator->getNextFn = hashGroupbyAggregate; pOperator->closeFn = destroyGroupbyOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7555,13 +7644,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf pOperator->name = "FillOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Fill; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doFill; + pOperator->getNextFn = doFill; pOperator->closeFn = destroySFillOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7606,13 +7695,12 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->name = "SLimitOperator"; // pOperator->operatorType = OP_SLimit; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->exec = doSLimit; pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->closeFn = destroySlimitOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7746,6 +7834,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { return (pRes->info.rows == 0)? NULL:pInfo->pRes; #endif + return 0; } SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { @@ -7762,9 +7851,9 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo pOperator->name = "SeqTableTagScan"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->nextDataFn = doTagScan; + pOperator->getNextFn = doTagScan; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; @@ -7834,7 +7923,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -7900,13 +7989,13 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "DistinctOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Distinct; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = hashDistinct; + pOperator->getNextFn = hashDistinct; pOperator->pExpr = pExpr; pOperator->closeFn = destroyDistinctOperatorInfo; @@ -8500,75 +8589,6 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { pResultInfo->total = 0; } -FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { - return ((SQInfo *)qHandle)->qId == qId; -} - -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, - int32_t prevResultLen, void* merger) { - int32_t code = TSDB_CODE_SUCCESS; - - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - pRuntimeEnv->qinfo = pQInfo; - - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - - STSBuf *pTsBuf = NULL; - - if (pTsBufInfo->tsLen > 0) { // open new file to save the result - char* tsBlock = start + pTsBufInfo->tsOffset; - pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder, - pQueryAttr->vgId); - - if (pTsBuf == NULL) { - code = TSDB_CODE_QRY_NO_DISKSPACE; - goto _error; - } - tsBufResetPos(pTsBuf); - bool ret = tsBufNextPos(pTsBuf); - UNUSED(ret); - } - - SArray* prevResult = NULL; - if (prevResultLen > 0) { - prevResult = interResFromBinary(param->prevResult, prevResultLen); - pRuntimeEnv->prevResult = prevResult; - } - - pRuntimeEnv->currentOffset = pQueryAttr->limit.offset; - if (tsdb != NULL) { -// pQueryAttr->precision = tsdbGetCfg(tsdb)->precision; - } - - if ((QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.skey > pQueryAttr->window.ekey)) || - (!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) { - //qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey, -// pQueryAttr->window.ekey, pQueryAttr->order.order); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; - // todo free memory - return TSDB_CODE_SUCCESS; - } - - if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { - //qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - return TSDB_CODE_SUCCESS; - } - - // filter the qualified - if ((code = doInitQInfo(pQInfo, pTsBuf, tsdb, sourceOptr, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) { - goto _error; - } - - return code; - -_error: - // table query ref will be decrease during error handling -// doDestroyTask(pQInfo); - return code; -} - //TODO refactor void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { if (pFilter == NULL || numOfFilters == 0) { diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 28e2c9ef75..38ceeffc20 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { - pOperator->nextDataFn = getDummyBlock; + pOperator->getNextFn = getDummyBlock; } else { - pOperator->nextDataFn = get2ColsDummyBlock; + pOperator->getNextFn = get2ColsDummyBlock; } SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); @@ -971,7 +971,7 @@ TEST(testCase, inMem_sort_Test) { SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); bool newgroup = false; - SSDataBlock* pRes = pOperator->nextDataFn(pOperator, &newgroup); + SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup); SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); @@ -1049,7 +1049,7 @@ TEST(testCase, external_sort_Test) { while(1) { int64_t s = taosGetTimestampUs(); - pRes = pOperator->nextDataFn(pOperator, &newgroup); + pRes = pOperator->getNextFn(pOperator, &newgroup); int64_t e = taosGetTimestampUs(); if (t++ == 1) { @@ -1121,7 +1121,7 @@ TEST(testCase, sorted_merge_Test) { while(1) { int64_t s = taosGetTimestampUs(); - pRes = pOperator->nextDataFn(pOperator, &newgroup); + pRes = pOperator->getNextFn(pOperator, &newgroup); int64_t e = taosGetTimestampUs(); if (t++ == 1) { @@ -1199,7 +1199,7 @@ TEST(testCase, time_interval_Operator_Test) { while(1) { int64_t s = taosGetTimestampUs(); - pRes = pOperator->nextDataFn(pOperator, &newgroup); + pRes = pOperator->getNextFn(pOperator, &newgroup); int64_t e = taosGetTimestampUs(); if (t++ == 1) { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 180853c0fb..c675827253 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -131,7 +131,6 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) { case TSDB_DATA_TYPE_DOUBLE: COPY_SCALAR_FIELD(datum.d); break; - case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 7b60218180..782cdcb71a 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -978,7 +978,6 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) { case TSDB_DATA_TYPE_DOUBLE: code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d); break; - case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: @@ -1042,7 +1041,6 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { case TSDB_DATA_TYPE_DOUBLE: code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d); break; - case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 4055faf299..7e1d15c191 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -386,7 +386,6 @@ void* nodesGetValueFromNode(SValueNode *pNode) { case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_DOUBLE: return (void*)&pNode->datum.d; - case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4be4e25eb8..ac1509462b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -287,7 +287,6 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { pVal->datum.d = strtold(pVal->literal, &endPtr); break; } - case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: { @@ -601,7 +600,6 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool static int32_t getPositionValue(const SValueNode* pVal) { switch (pVal->node.resType.type) { case TSDB_DATA_TYPE_NULL: - case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: @@ -1438,7 +1436,6 @@ static void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { case TSDB_DATA_TYPE_DOUBLE: pVal->d = pNode->datum.d; break; - case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 3748d37d74..1af89d71c9 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -56,7 +56,7 @@ protected: const string syntaxTreeStr = toString(query_->pRoot, false); SLogicNode* pLogicPlan = nullptr; - SPlanContext cxt = { .queryId = 1, .pAstRoot = query_->pRoot }; + SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot }; code = createLogicPlan(&cxt, &pLogicPlan); if (code != TSDB_CODE_SUCCESS) { cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index b9ef6f8504..42890ab38a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -495,7 +495,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ASSERT(pRes->info.rows > 0); - SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; + SInputData inputData = {.pData = pRes}; code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); if (code) { QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code)); diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index b066dd2e77..0fe7bf6e36 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -411,27 +411,26 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) { } int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = { -/* NULL BOOL TINY SMAL INT BIG FLOA DOUB BINA TIME NCHA UTIN USMA UINT UBIG VARC VARB JSON DECI BLOB */ -/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -/*BOOL*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 0, 12, 13, 14, 7, 7, 0, 0, 0, -/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 7, 0, 0, 0, -/*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 7, 0, 0, 0, -/*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 7, 9, 7, 4, 4, 5, 7, 7, 7, 0, 0, 0, -/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 7, 7, 0, 0, 0, -/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 7, 7, 0, 0, 0, -/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0, -/*BINA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0, -/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 7, 7, 0, 0, 0, -/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0, -/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 7, 7, 0, 0, 0, -/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 7, 7, 0, 0, 0, -/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 7, 7, 0, 0, 0, -/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 0, 0, 0, -/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 +/* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG VARB JSON DECI BLOB */ +/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +/*BOOL*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 0, 12, 13, 14, 7, 0, 0, 0, +/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 0, 0, 0, +/*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 0, 0, 0, +/*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 7, 9, 7, 4, 4, 5, 7, 7, 0, 0, 0, +/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 7, 0, 0, 0, +/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 7, 0, 0, 0, +/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0, +/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0, +/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 7, 0, 0, 0, +/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, +/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 7, 0, 0, 0, +/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 7, 0, 0, 0, +/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 7, 0, 0, 0, +/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, +/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; int32_t vectorGetConvertType(int32_t type1, int32_t type2) {