Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

This commit is contained in:
Minghao Li 2022-03-12 17:09:03 +08:00
commit 4676a486a9
19 changed files with 406 additions and 652 deletions

View File

@ -31,27 +31,27 @@ typedef void TAOS_SUB;
typedef void **TAOS_ROW; typedef void **TAOS_ROW;
// Data type definition // Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes #define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes #define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte #define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes #define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes #define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes #define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes #define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes #define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar #define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes #define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string #define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte #define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes #define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes #define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes #define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 15 // string #define TSDB_DATA_TYPE_JSON 15 // json string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary #define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_JSON 17 // json #define TSDB_DATA_TYPE_DECIMAL 17 // decimal
#define TSDB_DATA_TYPE_DECIMAL 18 // decimal #define TSDB_DATA_TYPE_BLOB 18 // binary
#define TSDB_DATA_TYPE_BLOB 19 // binary #define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_MEDIUMBLOB 20 #define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
typedef enum { typedef enum {
TSDB_OPTION_LOCALE, TSDB_OPTION_LOCALE,

View File

@ -41,7 +41,6 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef struct SInputData { typedef struct SInputData {
const struct SSDataBlock* pData; const struct SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap;
} SInputData; } SInputData;
typedef struct SOutputData { typedef struct SOutputData {

View File

@ -29,7 +29,6 @@ extern "C" {
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct SRpcPush SRpcPush;
typedef struct SRpcConnInfo { typedef struct SRpcConnInfo {
uint32_t clientIp; uint32_t clientIp;
@ -45,16 +44,8 @@ typedef struct SRpcMsg {
int32_t code; int32_t code;
void * handle; // rpc handle returned to app void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client void * ahandle; // app handle set by client
int persist; // keep handle or not, default 0
SRpcPush *push;
} SRpcMsg; } SRpcMsg;
typedef struct SRpcPush {
void *arg;
int (*callback)(void *arg, SRpcMsg *rpcMsg);
} SRpcPush;
typedef struct SRpcInit { typedef struct SRpcInit {
uint16_t localPort; // local port uint16_t localPort; // local port

View File

@ -253,6 +253,11 @@ typedef struct STaskIdInfo {
char* str; char* str;
} STaskIdInfo; } STaskIdInfo;
typedef struct STaskBufInfo {
int32_t bufSize; // total available buffer size in bytes
int32_t remainBuf; // remain buffer size
} STaskBufInfo;
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
char* content; char* content;
@ -264,7 +269,8 @@ typedef struct SExecTaskInfo {
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string char* sql; // query sql string
jmp_buf env; // jmp_buf env; // when error occurs, abort
STaskBufInfo bufInfo; // available buffer info this task
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
} SExecTaskInfo; } SExecTaskInfo;
@ -307,9 +313,11 @@ typedef struct STaskRuntimeEnv {
} STaskRuntimeEnv; } STaskRuntimeEnv;
enum { enum {
OP_IN_EXECUTING = 1, OP_NOT_OPENED = 0x0,
OP_RES_TO_RETURN = 2, OP_OPENED = 0x1,
OP_EXEC_DONE = 3, OP_IN_EXECUTING = 0x3,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
@ -322,12 +330,14 @@ typedef struct SOperatorInfo {
SExprInfo* pExpr; SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t openFn; __optr_fn_t getNextFn;
__optr_fn_t nextDataFn; __optr_fn_t cleanupFn;
__optr_close_fn_t closeFn; __optr_close_fn_t closeFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
} SOperatorInfo; } SOperatorInfo;
typedef struct { typedef struct {
@ -378,9 +388,9 @@ typedef struct STaskParam {
} STaskParam; } STaskParam;
enum { enum {
DATA_NOT_READY = 0x1, EX_SOURCE_DATA_NOT_READY = 0x1,
DATA_READY = 0x2, EX_SOURCE_DATA_READY = 0x2,
DATA_EXHAUSTED = 0x3, EX_SOURCE_DATA_EXHAUSTED = 0x3,
}; };
typedef struct SSourceDataInfo { typedef struct SSourceDataInfo {
@ -639,12 +649,16 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
@ -674,10 +688,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doSLimit(void* param, bool* newgroup);
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); // int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
@ -691,7 +701,6 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo,
int32_t* rowCellInfoOffset); int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);

View File

@ -21,8 +21,6 @@
#include "tqueue.h" #include "tqueue.h"
#include "executorimpl.h" #include "executorimpl.h"
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
typedef struct SDataDispatchBuf { typedef struct SDataDispatchBuf {
int32_t useSize; int32_t useSize;
int32_t allocSize; int32_t allocSize;
@ -90,19 +88,6 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
data += pColRes->info.bytes * pInput->pData->info.rows; data += pColRes->info.bytes * pInput->pData->info.rows;
} }
} }
int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL);
while (item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
item = taosHashIterate(pInput->pTableRetrieveTsMap, item);
}
} }
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... // data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
@ -113,7 +98,7 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
pEntry->numOfRows = pInput->pData->info.rows; pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0; pEntry->dataLen = 0;
pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap); pBuf->useSize = sizeof(SRetrieveTableRsp);
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
if (0 == pEntry->compressed) { if (0 == pEntry->compressed) {
pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows; pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows;
@ -130,7 +115,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false; return false;
} }
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows; pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows;
pBuf->pData = malloc(pBuf->allocSize); pBuf->pData = malloc(pBuf->allocSize);
if (pBuf->pData == NULL) { if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));

View File

@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t st = 0; int64_t st = 0;
st = taosGetTimestampUs(); st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup); *pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup);
uint64_t el = (taosGetTimestampUs() - st); uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el; pTaskInfo->cost.elapsedTime += el;

View File

@ -211,6 +211,9 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
@ -221,6 +224,17 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
} }
} }
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
static int32_t operatorDummyOpenFn(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
@ -4723,6 +4737,11 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info; STableScanInfo *pTableScanInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
// The read handle is not initialized yet, since no qualified tables exists // The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->pTsdbReadHandle == NULL) { if (pTableScanInfo->pTsdbReadHandle == NULL) {
return NULL; return NULL;
@ -4840,6 +4859,11 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
pBlockInfo->rows = 0; pBlockInfo->rows = 0;
@ -4880,7 +4904,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen); pRsp->compLen = htonl(pRsp->compLen);
pSourceDataInfo->status = DATA_READY; pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready); tsem_post(&pSourceDataInfo->pEx->ready);
} }
@ -5008,12 +5032,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
for (int32_t i = 0; i < totalSources; ++i) { for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == DATA_EXHAUSTED) { if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1; completed += 1;
continue; continue;
} }
if (pDataInfo->status != DATA_READY) { if (pDataInfo->status != EX_SOURCE_DATA_READY) {
continue; continue;
} }
@ -5026,7 +5050,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
pExchangeInfo->totalRows); pExchangeInfo->totalRows);
pDataInfo->status = DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1; completed += 1;
continue; continue;
} }
@ -5042,15 +5066,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1, pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1,
totalSources); totalSources);
pDataInfo->status = DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows,
pExchangeInfo->totalSize); pExchangeInfo->totalSize);
} }
if (pDataInfo->status != DATA_EXHAUSTED) { if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
pDataInfo->status = DATA_NOT_READY; pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -5070,14 +5094,10 @@ _error:
return NULL; return NULL;
} }
static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) {
SExchangeInfo *pExchangeInfo = pOperator->info; SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
@ -5085,7 +5105,8 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) {
for(int32_t i = 0; i < totalSources; ++i) { for(int32_t i = 0; i < totalSources; ++i) {
int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; pTaskInfo->code = code;
return code;
} }
} }
@ -5093,9 +5114,9 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) {
qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs); qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
pOperator->status = OP_RES_TO_RETURN; return TSDB_CODE_SUCCESS;
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
} }
static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
@ -5123,7 +5144,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pExchangeInfo->totalRows); pDataInfo->totalRows, pExchangeInfo->totalRows);
pDataInfo->status = DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1; pExchangeInfo->current += 1;
continue; continue;
} }
@ -5138,7 +5159,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1, pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1,
totalSources); totalSources);
pDataInfo->status = DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1; pExchangeInfo->current += 1;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
@ -5149,14 +5170,39 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
} }
} }
static int32_t prepareLoadRemoteData(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
SExchangeInfo *pExchangeInfo = pOperator->info;
if (pExchangeInfo->seqLoadData) {
// do nothing for sequentially load data
} else {
int32_t code = prepareConcurrentlyLoad(pOperator);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param; SOperatorInfo *pOperator = (SOperatorInfo*) param;
SExchangeInfo *pExchangeInfo = pOperator->info; SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); int32_t code = pOperator->_openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
@ -5164,11 +5210,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
} }
*newgroup = false; *newgroup = false;
if (pExchangeInfo->seqLoadData) { if (pExchangeInfo->seqLoadData) {
return seqLoadRemoteData(pOperator); return seqLoadRemoteData(pOperator);
} else { } else {
return concurrentlyLoadRemoteData(pOperator); return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
} }
#if 0 #if 0
@ -5181,16 +5226,35 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
#endif #endif
} }
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.pEx = pInfo;
dataInfo.index = i;
void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (ret == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
// TODO handle the error // TODO handle the error
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo); goto _error;
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
} }
size_t numOfSources = LIST_LENGTH(pSources); size_t numOfSources = LIST_LENGTH(pSources);
@ -5217,29 +5281,28 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
return NULL; return NULL;
} }
for(int32_t i = 0; i < numOfSources; ++i) { int32_t code = initDataSource(numOfSources, pInfo);
SSourceDataInfo dataInfo = {0}; if (code != TSDB_CODE_SUCCESS) {
dataInfo.status = DATA_NOT_READY; goto _error;
dataInfo.pEx = pInfo;
dataInfo.index = i;
taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
} }
size_t size = pBlock->info.numOfCols; size_t size = pBlock->info.numOfCols;
pInfo->pResult = pBlock; pInfo->pResult = pBlock;
pInfo->seqLoadData = true; pInfo->seqLoadData = true;
pInfo->seqLoadData = true; // sequentially load data from the source node
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator"; pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = size; pOperator->numOfOutput = size;
pOperator->nextDataFn = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function.
pOperator->getNextFn = doLoadRemoteData;
pOperator->closeFn = destroyExchangeOperatorInfo;
#if 1 #if 1
{ // todo refactor { // todo refactor
@ -5265,6 +5328,16 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
#endif #endif
return pOperator; return pOperator;
_error:
if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, 0);
}
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
} }
SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
@ -5315,10 +5388,12 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->nextDataFn = doTableScan; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doTableScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
return pOperator; return pOperator;
@ -5339,11 +5414,11 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
pOperator->name = "TableSeqScanOperator"; pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doTableScanImpl; pOperator->getNextFn = doTableScanImpl;
return pOperator; return pOperator;
} }
@ -5364,10 +5439,10 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
pOperator->name = "TableBlockInfoScanOperator"; pOperator->name = "TableBlockInfoScanOperator";
// pOperator->operatorType = OP_TableBlockInfoScan; // pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
// pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; // pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->nextDataFn = doBlockInfoScan; pOperator->getNextFn = doBlockInfoScan;
return pOperator; return pOperator;
} }
@ -5397,10 +5472,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols; pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->nextDataFn = doStreamBlockScan; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doStreamBlockScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
return pOperator; return pOperator;
} }
@ -5415,7 +5493,7 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen); pRsp->compLen = htonl(pRsp->compLen);
pSourceDataInfo->status = DATA_READY; pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready); tsem_post(&pSourceDataInfo->pEx->ready);
} }
@ -5513,7 +5591,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doSysTableScan; pOperator->getNextFn = doSysTableScan;
pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->closeFn = destroySysTableScannerOperatorInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
@ -5741,7 +5819,7 @@ SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
bool newgroup = false; bool newgroup = false;
return pOperator->nextDataFn(pOperator, &newgroup); return pOperator->getNextFn(pOperator, &newgroup);
} }
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) { static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) {
@ -6055,13 +6133,13 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->name = "SortedMerge"; pOperator->name = "SortedMerge";
// pOperator->operatorType = OP_SortedMerge; // pOperator->operatorType = OP_SortedMerge;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->nextDataFn = doSortedMerge; pOperator->getNextFn = doSortedMerge;
pOperator->closeFn = destroySortedMergeOperatorInfo; pOperator->closeFn = destroySortedMergeOperatorInfo;
code = appendDownstream(pOperator, downstream, numOfDownstream); code = appendDownstream(pOperator, downstream, numOfDownstream);
@ -6153,11 +6231,11 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pOperator->name = "Order"; pOperator->name = "Order";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->nextDataFn = doSort; pOperator->getNextFn = doSort;
pOperator->closeFn = destroyOrderOperatorInfo; pOperator->closeFn = destroyOrderOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -6183,7 +6261,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6233,7 +6311,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6319,7 +6397,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// The downstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6377,7 +6455,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (1) { while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6428,7 +6506,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
while (1) { while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); SSDataBlock *pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6471,7 +6549,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6531,7 +6609,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6594,7 +6672,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6649,7 +6727,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6784,7 +6862,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6846,7 +6924,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
@ -6899,7 +6977,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
@ -6984,7 +7062,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) { if (*newgroup) {
@ -7141,13 +7219,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->nextDataFn = doAggregate; pOperator->getNextFn = doAggregate;
pOperator->closeFn = destroyAggOperatorInfo; pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7196,17 +7274,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevData); tfree(pInfo->prevData);
} }
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param; SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
} }
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param; STagScanInfo* pInfo = (STagScanInfo*) param;
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
@ -7236,6 +7314,17 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput)
} }
} }
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
SExchangeInfo* pExInfo = (SExchangeInfo*) param;
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) {
blockDataDestroy(pExInfo->pResult);
}
tsem_destroy(&pExInfo->ready);
}
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
@ -7250,13 +7339,13 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
pOperator->name = "MultiTableAggregate"; pOperator->name = "MultiTableAggregate";
// pOperator->operatorType = OP_MultiTableAggregate; // pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->nextDataFn = doMultiTableAggregate; pOperator->getNextFn = doMultiTableAggregate;
pOperator->closeFn = destroyAggOperatorInfo; pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7277,12 +7366,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
// pOperator->operatorType = OP_Project; // pOperator->operatorType = OP_Project;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doProjectOperation; pOperator->getNextFn = doProjectOperation;
pOperator->closeFn = destroyProjectOperatorInfo; pOperator->closeFn = destroyProjectOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7331,11 +7420,11 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->name = "LimitOperator"; pOperator->name = "LimitOperator";
// pOperator->operatorType = OP_Limit; // pOperator->operatorType = OP_Limit;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->nextDataFn = doLimit; pOperator->getNextFn = doLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
} }
@ -7363,13 +7452,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
// pOperator->operatorType = OP_TimeWindow; // pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->nextDataFn = doIntervalAgg; pOperator->getNextFn = doIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
@ -7388,12 +7477,12 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator->name = "AllTimeIntervalAggOperator"; pOperator->name = "AllTimeIntervalAggOperator";
// pOperator->operatorType = OP_AllTimeWindow; // pOperator->operatorType = OP_AllTimeWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doAllIntervalAgg; pOperator->getNextFn = doAllIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7412,12 +7501,12 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pOperator->name = "StateWindowOperator"; pOperator->name = "StateWindowOperator";
// pOperator->operatorType = OP_StateWindow; // pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doStateWindowAgg; pOperator->getNextFn = doStateWindowAgg;
pOperator->closeFn = destroyStateWindowOperatorInfo; pOperator->closeFn = destroyStateWindowOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7437,12 +7526,12 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
// pOperator->operatorType = OP_SessionWindow; // pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doSessionWindowAgg; pOperator->getNextFn = doSessionWindowAgg;
pOperator->closeFn = destroySWindowOperatorInfo; pOperator->closeFn = destroySWindowOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7460,13 +7549,13 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pOperator->name = "MultiTableTimeIntervalOperator"; pOperator->name = "MultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_MultiTableTimeInterval; // pOperator->operatorType = OP_MultiTableTimeInterval;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doSTableIntervalAgg; pOperator->getNextFn = doSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7484,13 +7573,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->name = "AllMultiTableTimeIntervalOperator"; pOperator->name = "AllMultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_AllMultiTableTimeInterval; // pOperator->operatorType = OP_AllMultiTableTimeInterval;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doAllSTableIntervalAgg; pOperator->getNextFn = doAllSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7516,13 +7605,13 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GroupbyAggOperator"; pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby; // pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = hashGroupbyAggregate; pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo; pOperator->closeFn = destroyGroupbyOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7555,13 +7644,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Fill; // pOperator->operatorType = OP_Fill;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = doFill; pOperator->getNextFn = doFill;
pOperator->closeFn = destroySFillOperatorInfo; pOperator->closeFn = destroySFillOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7606,13 +7695,12 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->name = "SLimitOperator"; pOperator->name = "SLimitOperator";
// pOperator->operatorType = OP_SLimit; // pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
// pOperator->exec = doSLimit; // pOperator->exec = doSLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->closeFn = destroySlimitOperatorInfo; pOperator->closeFn = destroySlimitOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
} }
@ -7746,6 +7834,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
return (pRes->info.rows == 0)? NULL:pInfo->pRes; return (pRes->info.rows == 0)? NULL:pInfo->pRes;
#endif #endif
return 0;
} }
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
@ -7762,9 +7851,9 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
pOperator->name = "SeqTableTagScan"; pOperator->name = "SeqTableTagScan";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->nextDataFn = doTagScan; pOperator->getNextFn = doTagScan;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
@ -7834,7 +7923,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -7900,13 +7989,13 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "DistinctOperator"; pOperator->name = "DistinctOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Distinct; // pOperator->operatorType = OP_Distinct;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->nextDataFn = hashDistinct; pOperator->getNextFn = hashDistinct;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->closeFn = destroyDistinctOperatorInfo; pOperator->closeFn = destroyDistinctOperatorInfo;
@ -8500,75 +8589,6 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) {
pResultInfo->total = 0; pResultInfo->total = 0;
} }
FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId;
}
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger) {
int32_t code = TSDB_CODE_SUCCESS;
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->qinfo = pQInfo;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STSBuf *pTsBuf = NULL;
if (pTsBufInfo->tsLen > 0) { // open new file to save the result
char* tsBlock = start + pTsBufInfo->tsOffset;
pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder,
pQueryAttr->vgId);
if (pTsBuf == NULL) {
code = TSDB_CODE_QRY_NO_DISKSPACE;
goto _error;
}
tsBufResetPos(pTsBuf);
bool ret = tsBufNextPos(pTsBuf);
UNUSED(ret);
}
SArray* prevResult = NULL;
if (prevResultLen > 0) {
prevResult = interResFromBinary(param->prevResult, prevResultLen);
pRuntimeEnv->prevResult = prevResult;
}
pRuntimeEnv->currentOffset = pQueryAttr->limit.offset;
if (tsdb != NULL) {
// pQueryAttr->precision = tsdbGetCfg(tsdb)->precision;
}
if ((QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.skey > pQueryAttr->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) {
//qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey,
// pQueryAttr->window.ekey, pQueryAttr->order.order);
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0;
// todo free memory
return TSDB_CODE_SUCCESS;
}
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
//qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId);
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS;
}
// filter the qualified
if ((code = doInitQInfo(pQInfo, pTsBuf, tsdb, sourceOptr, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) {
goto _error;
}
return code;
_error:
// table query ref will be decrease during error handling
// doDestroyTask(pQInfo);
return code;
}
//TODO refactor //TODO refactor
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
if (pFilter == NULL || numOfFilters == 0) { if (pFilter == NULL || numOfFilters == 0) {

View File

@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
pOperator->name = "dummyInputOpertor4Test"; pOperator->name = "dummyInputOpertor4Test";
if (numOfCols == 1) { if (numOfCols == 1) {
pOperator->nextDataFn = getDummyBlock; pOperator->getNextFn = getDummyBlock;
} else { } else {
pOperator->nextDataFn = get2ColsDummyBlock; pOperator->getNextFn = get2ColsDummyBlock;
} }
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
@ -971,7 +971,7 @@ TEST(testCase, inMem_sort_Test) {
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = pOperator->nextDataFn(pOperator, &newgroup); SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup);
SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0)); SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1)); SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
@ -1049,7 +1049,7 @@ TEST(testCase, external_sort_Test) {
while(1) { while(1) {
int64_t s = taosGetTimestampUs(); int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup); pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs(); int64_t e = taosGetTimestampUs();
if (t++ == 1) { if (t++ == 1) {
@ -1121,7 +1121,7 @@ TEST(testCase, sorted_merge_Test) {
while(1) { while(1) {
int64_t s = taosGetTimestampUs(); int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup); pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs(); int64_t e = taosGetTimestampUs();
if (t++ == 1) { if (t++ == 1) {
@ -1199,7 +1199,7 @@ TEST(testCase, time_interval_Operator_Test) {
while(1) { while(1) {
int64_t s = taosGetTimestampUs(); int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup); pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs(); int64_t e = taosGetTimestampUs();
if (t++ == 1) { if (t++ == 1) {

View File

@ -131,7 +131,6 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
COPY_SCALAR_FIELD(datum.d); COPY_SCALAR_FIELD(datum.d);
break; break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:

View File

@ -978,7 +978,6 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d); code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d);
break; break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
@ -1042,7 +1041,6 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d); code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d);
break; break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:

View File

@ -386,7 +386,6 @@ void* nodesGetValueFromNode(SValueNode *pNode) {
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
return (void*)&pNode->datum.d; return (void*)&pNode->datum.d;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:

View File

@ -287,7 +287,6 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
pVal->datum.d = strtold(pVal->literal, &endPtr); pVal->datum.d = strtold(pVal->literal, &endPtr);
break; break;
} }
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: { case TSDB_DATA_TYPE_VARBINARY: {
@ -601,7 +600,6 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool
static int32_t getPositionValue(const SValueNode* pVal) { static int32_t getPositionValue(const SValueNode* pVal) {
switch (pVal->node.resType.type) { switch (pVal->node.resType.type) {
case TSDB_DATA_TYPE_NULL: case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
@ -1438,7 +1436,6 @@ static void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
pVal->d = pNode->datum.d; pVal->d = pNode->datum.d;
break; break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:

View File

@ -56,7 +56,7 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false); const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicPlan = nullptr; SLogicNode* pLogicPlan = nullptr;
SPlanContext cxt = { .queryId = 1, .pAstRoot = query_->pRoot }; SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot };
code = createLogicPlan(&cxt, &pLogicPlan); code = createLogicPlan(&cxt, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl; cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;

View File

@ -495,7 +495,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
ASSERT(pRes->info.rows > 0); ASSERT(pRes->info.rows > 0);
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; SInputData inputData = {.pData = pRes};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) { if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code)); QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code));

View File

@ -411,27 +411,26 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
} }
int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = { int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
/* NULL BOOL TINY SMAL INT BIG FLOA DOUB BINA TIME NCHA UTIN USMA UINT UBIG VARC VARB JSON DECI BLOB */ /* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG VARB JSON DECI BLOB */
/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BOOL*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 0, 12, 13, 14, 7, 7, 0, 0, 0, /*BOOL*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 0, 12, 13, 14, 7, 0, 0, 0,
/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 7, 0, 0, 0, /*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 0, 0, 0,
/*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 7, 0, 0, 0, /*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 0, 0, 0,
/*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 7, 9, 7, 4, 4, 5, 7, 7, 7, 0, 0, 0, /*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 7, 9, 7, 4, 4, 5, 7, 7, 0, 0, 0,
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 7, 7, 0, 0, 0, /*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 7, 0, 0, 0,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 7, 7, 0, 0, 0, /*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 7, 0, 0, 0,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0, /*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0,
/*BINA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0, /*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 7, 7, 0, 0, 0, /*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 7, 0, 0, 0,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0, /*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 7, 7, 0, 0, 0, /*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 7, 0, 0, 0,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 7, 7, 0, 0, 0, /*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 7, 0, 0, 0,
/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 7, 7, 0, 0, 0, /*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 7, 0, 0, 0,
/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 0, 0, 0, /*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
}; };
int32_t vectorGetConvertType(int32_t type1, int32_t type2) { int32_t vectorGetConvertType(int32_t type1, int32_t type2) {

View File

@ -17,11 +17,6 @@
#include "transComm.h" #include "transComm.h"
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
@ -32,7 +27,6 @@ typedef struct SCliConn {
void* data; void* data;
queue conn; queue conn;
uint64_t expireTime; uint64_t expireTime;
int8_t ctnRdCnt; // continue read count
int hThrdIdx; int hThrdIdx;
bool broken; // link broken or not bool broken; // link broken or not
@ -69,12 +63,12 @@ typedef struct SCliThrdObj {
bool quit; bool quit;
} SCliThrdObj; } SCliThrdObj;
typedef struct SClientObj { typedef struct SCliObj {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
int32_t index; int32_t index;
int numOfThreads; int numOfThreads;
SCliThrdObj** pThreadObj; SCliThrdObj** pThreadObj;
} SClientObj; } SCliObj;
typedef struct SConnList { typedef struct SConnList {
queue conn; queue conn;
@ -88,43 +82,63 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle); static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for read // alloc buf for recv
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket // callback after read nbytes from socket
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after write data to socket // callback after write data to socket
static void clientWriteCb(uv_write_t* req, int status); static void cliSendCb(uv_write_t* req, int status);
// callback after conn to server // callback after conn to server
static void clientConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void clientAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
static SCliConn* clientConnCreate(SCliThrdObj* thrd); static SCliConn* cliCreateConn(SCliThrdObj* thrd);
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void clientDestroy(uv_handle_t* handle); static void cliDestroy(uv_handle_t* handle);
// process data read from server, add decompress etc later // process data read from server, add decompress etc later
static void clientHandleResp(SCliConn* conn); static void cliHandleResp(SCliConn* conn);
// handle except about conn // handle except about conn
static void clientHandleExcept(SCliConn* conn); static void cliHandleExcept(SCliConn* conn);
// handle req from app // handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd); static void cliSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(SRpcMsg* userdata); static void destroyUserdata(SRpcMsg* userdata);
static int clientRBChoseIdx(SRpcInfo* pTransInst); static int cliRBChoseIdx(SRpcInfo* pTransInst);
static void destroyCmsg(SCliMsg* cmsg); static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx); static void transDestroyConnCtx(STransConnCtx* ctx);
// thread obj // thread obj
static SCliThrdObj* createThrdObj(); static SCliThrdObj* createThrdObj();
static void destroyThrdObj(SCliThrdObj* pThrd); static void destroyThrdObj(SCliThrdObj* pThrd);
// thread
static void* clientThread(void* arg);
static void* clientNotifyApp() {} #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
static void clientHandleResp(SCliConn* conn) { #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \
if (thrd->quit) { \
cliHandleExcept(conn); \
goto _RETURE; \
} \
} while (0)
#define CONN_HANDLE_BROKEN(conn) \
do { \
if (conn->broken) { \
cliHandleExcept(conn); \
goto _RETURE; \
} \
} while (0);
static void* cliWorkThread(void* arg);
static void* cliNotifyApp() {}
static void cliHandleResp(SCliConn* conn) {
SCliMsg* pMsg = conn->data; SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
@ -147,27 +161,28 @@ static void clientHandleResp(SCliConn* conn) {
if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
rpcMsg.handle = conn; rpcMsg.handle = conn;
transRefCliHandle(conn);
conn->persist = 1; conn->persist = 1;
tDebug("client conn %p persist by app", conn); tDebug("cli conn %p persist by app", conn);
} }
tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen);
conn->secured = pHead->secured; conn->secured = pHead->secured;
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tTrace("%s client conn %p handle resp", pTransInst->label, conn); tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn);
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
conn->ctnRdCnt += 1;
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb);
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->persist == 0) { if (conn->persist == 0) {
@ -178,10 +193,10 @@ static void clientHandleResp(SCliConn* conn) {
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
} }
static void clientHandleExcept(SCliConn* pConn) { static void cliHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) { if (pConn->data == NULL) {
// handle conn except in conn pool // handle conn except in conn pool
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
@ -199,25 +214,25 @@ static void clientHandleExcept(SCliConn* pConn) {
rpcMsg.msgType = pMsg->msg.msgType + 1; rpcMsg.msgType = pMsg->msg.msgType + 1;
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tTrace("%s client conn %p handle resp", pTransInst->label, pConn); tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
tTrace("%s client conn(sync) %p handle resp", pTransInst->label, pConn); tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
destroyCmsg(pConn->data); destroyCmsg(pConn->data);
pConn->data = NULL; pConn->data = NULL;
tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
static void clientTimeoutCb(uv_timer_t* handle) { static void cliTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst; SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tTrace("%s, client conn timeout, try to remove expire conn from conn pool", pRpc->label); tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label);
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) { while (p != NULL) {
@ -235,7 +250,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
} }
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
static void* createConnPool(int size) { static void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
@ -248,7 +263,7 @@ static void* destroyConnPool(void* pool) {
queue* h = QUEUE_HEAD(&connList->conn); queue* h = QUEUE_HEAD(&connList->conn);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
clientConnDestroy(c, true); cliDestroyConn(c, true);
} }
connList = taosHashIterate((SHashObj*)pool, connList); connList = taosHashIterate((SHashObj*)pool, connList);
} }
@ -284,23 +299,22 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tstrncpy(key, ip, strlen(ip)); tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->ctnRdCnt = 0;
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
} }
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
} }
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later // impl later
if (handle->data == NULL) { if (handle->data == NULL) {
return; return;
@ -310,10 +324,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (transReadComplete(pBuf)) { if (transReadComplete(pBuf)) {
tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
clientHandleResp(conn); cliHandleResp(conn);
} else { } else {
tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
} }
return; return;
} }
@ -326,13 +340,13 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
return; return;
} }
if (nread < 0) { if (nread < 0) {
tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); tError("%s cli conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
conn->broken = true; conn->broken = true;
clientHandleExcept(conn); cliHandleExcept(conn);
} }
} }
static SCliConn* clientConnCreate(SCliThrdObj* pThrd) { static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
SCliConn* conn = calloc(1, sizeof(SCliConn)); SCliConn* conn = calloc(1, sizeof(SCliConn));
// read/write stream handle // read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
@ -348,40 +362,42 @@ static SCliConn* clientConnCreate(SCliThrdObj* pThrd) {
transRefCliHandle(conn); transRefCliHandle(conn);
return conn; return conn;
} }
static void clientConnDestroy(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
} }
} }
static void clientDestroy(uv_handle_t* handle) { static void cliDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
free(conn->stream); free(conn->stream);
tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn); free(conn);
} }
static void clientWriteCb(uv_write_t* req, int status) { static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status == 0) { if (status == 0) {
tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) { if (pMsg == NULL) {
return; return;
} }
destroyUserdata(&pMsg->msg); destroyUserdata(&pMsg->msg);
} else { } else {
tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
clientHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb);
} }
static void clientWrite(SCliConn* pConn) { static void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn);
SCliMsg* pCliMsg = pConn->data; SCliMsg* pCliMsg = pConn->data;
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
@ -416,18 +432,22 @@ static void clientWrite(SCliConn* pConn) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
_RETURE:
return;
} }
static void clientConnCb(uv_connect_t* req, int status) { static void cliConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status != 0) { if (status != 0) {
tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); tError("%s cli conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
clientHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
int addrlen = sizeof(pConn->addr); int addrlen = sizeof(pConn->addr);
@ -436,14 +456,14 @@ static void clientConnCb(uv_connect_t* req, int status) {
addrlen = sizeof(pConn->locaddr); addrlen = sizeof(pConn->locaddr);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
clientWrite(pConn); cliSend(pConn);
} }
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("client work thread %p start to quit", pThrd); tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg); destroyCmsg(pMsg);
destroyConnPool(pThrd->pool); destroyConnPool(pThrd->pool);
@ -452,60 +472,57 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
pThrd->quit = true; pThrd->quit = true;
uv_stop(pThrd->loop); uv_stop(pThrd->loop);
} }
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL;
if (pMsg->msg.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.handle);
transUnrefCliHandle(conn);
if (conn != NULL) {
tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
}
} else {
STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
}
return conn;
}
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el);
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pTransInst = pThrd->pTransInst; SRpcInfo* pTransInst = pThrd->pTransInst;
SCliConn* conn = NULL;
if (pMsg->msg.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.handle);
if (conn != NULL) {
tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn);
}
} else {
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
}
SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
conn->data = pMsg; conn->data = pMsg;
conn->writeReq.data = conn;
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
cliSend(conn);
if (pThrd->quit) {
clientHandleExcept(conn);
return;
}
clientWrite(conn);
} else { } else {
conn = clientConnCreate(pThrd); conn = cliCreateConn(pThrd);
conn->data = pMsg; conn->data = pMsg;
int ret = transSetConnOption((uv_tcp_t*)conn->stream); int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) { if (ret) {
tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
} }
struct sockaddr_in addr; struct sockaddr_in addr;
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
// handle error in callback if fail to connect // handle error in callback if fail to connect
tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
} }
conn->ctnRdCnt = 0;
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
} }
static void clientAsyncCb(uv_async_t* handle) { static void cliAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data; SAsyncItem* item = handle->data;
SCliThrdObj* pThrd = item->pThrd; SCliThrdObj* pThrd = item->pThrd;
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
queue wq;
// batch process to avoid to lock/unlock frequently // batch process to avoid to lock/unlock frequently
queue wq;
pthread_mutex_lock(&item->mtx); pthread_mutex_lock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq); QUEUE_MOVE(&item->qmsg, &wq);
pthread_mutex_unlock(&item->mtx); pthread_mutex_unlock(&item->mtx);
@ -517,25 +534,25 @@ static void clientAsyncCb(uv_async_t* handle) {
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg->ctx == NULL) { if (pMsg->ctx == NULL) {
clientHandleQuit(pMsg, pThrd); cliHandleQuit(pMsg, pThrd);
} else { } else {
clientHandleReq(pMsg, pThrd); cliHandleReq(pMsg, pThrd);
} }
count++; count++;
} }
if (count >= 2) { if (count >= 2) {
tTrace("client process batch size: %d", count); tTrace("cli process batch size: %d", count);
} }
} }
static void* clientThread(void* arg) { static void* cliWorkThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg; SCliThrdObj* pThrd = (SCliThrdObj*)arg;
setThreadName("trans-client-work"); setThreadName("trans-cli-work");
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SClientObj* cli = calloc(1, sizeof(SClientObj)); SCliObj* cli = calloc(1, sizeof(SCliObj));
SRpcInfo* pRpc = shandle; SRpcInfo* pRpc = shandle;
memcpy(cli->label, label, strlen(label)); memcpy(cli->label, label, strlen(label));
@ -547,9 +564,9 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
pThrd->pTransInst = shandle; pThrd->pTransInst = shandle;
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); int err = pthread_create(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
if (err == 0) { if (err == 0) {
tDebug("success to create tranport-client thread %d", i); tDebug("success to create tranport-cli thread %d", i);
} }
cli->pThreadObj[i] = pThrd; cli->pThreadObj[i] = pThrd;
} }
@ -574,13 +591,14 @@ static void destroyCmsg(SCliMsg* pMsg) {
static SCliThrdObj* createThrdObj() { static SCliThrdObj* createThrdObj() {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL); pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer); uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd; pThrd->timer.data = pThrd;
@ -611,21 +629,21 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
free(ctx); free(ctx);
} }
// //
static void clientSendQuit(SCliThrdObj* thrd) { static void cliSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully // cli can stop gracefully
SCliMsg* msg = calloc(1, sizeof(SCliMsg)); SCliMsg* msg = calloc(1, sizeof(SCliMsg));
transSendAsync(thrd->asyncPool, &msg->q); transSendAsync(thrd->asyncPool, &msg->q);
} }
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
SClientObj* cli = arg; SCliObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
clientSendQuit(cli->pThreadObj[i]); cliSendQuit(cli->pThreadObj[i]);
destroyThrdObj(cli->pThreadObj[i]); destroyThrdObj(cli->pThreadObj[i]);
} }
free(cli->pThreadObj); free(cli->pThreadObj);
free(cli); free(cli);
} }
static int clientRBChoseIdx(SRpcInfo* pTransInst) { static int cliRBChoseIdx(SRpcInfo* pTransInst) {
int64_t index = pTransInst->index; int64_t index = pTransInst->index;
if (pTransInst->index++ >= pTransInst->numOfThreads) { if (pTransInst->index++ >= pTransInst->numOfThreads) {
pTransInst->index = 0; pTransInst->index = 0;
@ -645,7 +663,7 @@ void transUnrefCliHandle(void* handle) {
} }
int ref = T_REF_DEC((SCliConn*)handle); int ref = T_REF_DEC((SCliConn*)handle);
if (ref == 0) { if (ref == 0) {
clientConnDestroy((SCliConn*)handle, true); cliDestroyConn((SCliConn*)handle, true);
} }
// unref cli handle // unref cli handle
@ -659,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
int index = CONN_HOST_THREAD_INDEX(pMsg->handle); int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
if (index == -1) { if (index == -1) {
index = clientRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
int32_t flen = 0; int32_t flen = 0;
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
@ -680,7 +698,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
cliMsg->msg = *pMsg; cliMsg->msg = *pMsg;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
} }
@ -692,7 +710,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
int index = CONN_HOST_THREAD_INDEX(pReq->handle); int index = CONN_HOST_THREAD_INDEX(pReq->handle);
if (index == -1) { if (index == -1) {
index = clientRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
@ -710,7 +728,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
cliMsg->msg = *pReq; cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem; tsem_t* pSem = pCtx->pSem;
tsem_wait(pSem); tsem_wait(pSem);

View File

@ -92,9 +92,9 @@ static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle); static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status); static void uvOnSendCb(uv_write_t* req, int status);
static void uvOnPipeWriteCb(uv_write_t* req, int status); static void uvOnPipeWriteCb(uv_write_t* req, int status);
static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
@ -240,7 +240,7 @@ static void uvHandleReq(SSrvConn* pConn) {
// validate msg type // validate msg type
} }
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt // opt
SSrvConn* conn = cli->data; SSrvConn* conn = cli->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
@ -282,7 +282,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
tError("server conn %p time out", pConn); tError("server conn %p time out", pConn);
} }
void uvOnWriteCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
SSrvConn* conn = req->data; SSrvConn* conn = req->data;
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
if (status == 0) { if (status == 0) {
@ -350,7 +350,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
uv_timer_stop(&pConn->pTimer); uv_timer_stop(&pConn->pTimer);
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSrvMsg* smsg) { static void uvStartSendResp(SSrvMsg* smsg) {
// impl // impl
@ -526,7 +526,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnRecvCb);
} else { } else {
tDebug("failed to create new connection"); tDebug("failed to create new connection");
@ -641,7 +641,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
uv_timer_stop(&conn->pTimer); uv_timer_stop(&conn->pTimer);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
free(conn->pTcp); free(conn->pTcp);
// free(conn); free(conn);
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
uv_loop_close(thrd->loop); uv_loop_close(thrd->loop);

View File

@ -3,7 +3,6 @@ add_executable(client "")
add_executable(server "") add_executable(server "")
add_executable(transUT "") add_executable(transUT "")
add_executable(syncClient "") add_executable(syncClient "")
add_executable(pushClient "")
add_executable(pushServer "") add_executable(pushServer "")
target_sources(transUT target_sources(transUT
@ -27,10 +26,6 @@ target_sources (syncClient
"syncClient.c" "syncClient.c"
) )
target_sources(pushClient
PRIVATE
"pushClient.c"
)
target_sources(pushServer target_sources(pushServer
PRIVATE PRIVATE
"pushServer.c" "pushServer.c"
@ -102,19 +97,6 @@ target_link_libraries (syncClient
transport transport
) )
target_include_directories(pushClient
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (pushClient
os
util
common
gtest_main
transport
)
target_include_directories(pushServer target_include_directories(pushServer
PUBLIC PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport" "${CMAKE_SOURCE_DIR}/include/libs/transport"

View File

@ -1,242 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/time.h>
#include <tdatablock.h>
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "trpc.h"
#include "tutil.h"
typedef struct {
int index;
SEpSet epSet;
int num;
int numOfReqs;
int msgSize;
tsem_t rspSem;
tsem_t * pOverSem;
pthread_t thread;
void * pRpc;
} SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont);
// tsem_post(&pInfo->rspSem);
tsem_post(&pInfo->rspSem);
}
static int tcount = 0;
typedef struct SPushArg {
tsem_t sem;
} SPushArg;
// ping
int pushCallback(void *arg, SRpcMsg *msg) {
SPushArg *push = arg;
tsem_post(&push->sem);
}
SRpcPush *createPushArg() {
SRpcPush *push = calloc(1, sizeof(SRpcPush));
push->arg = calloc(1, sizeof(SPushArg));
tsem_init(&(((SPushArg *)push->arg)->sem), 0, 0);
push->callback = pushCallback;
return push;
}
static void *sendRequest(void *param) {
SInfo * pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0};
tDebug("thread:%d, start to send request", pInfo->index);
tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
int u100 = 0;
int u500 = 0;
int u1000 = 0;
int u10000 = 0;
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
SRpcPush *push = createPushArg();
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
rpcMsg.push = push;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs();
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
tsem_wait(&pInfo->rspSem); // ping->pong
// tsem_wait(&pInfo->rspSem);
SPushArg *arg = push->arg;
/// e
tsem_wait(&arg->sem); // push callback
// query_fetch(client->h)
int64_t end = taosGetTimestampUs() - start;
if (end <= 100) {
u100++;
} else if (end > 100 && end <= 500) {
u500++;
} else if (end > 500 && end < 1000) {
u1000++;
} else {
u10000++;
}
tDebug("recv response succefully");
// taosSsleep(100);
}
tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
tDebug("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SEpSet epSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
char secret[20] = "mypassword";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
epSet.inUse = 0;
addEpIntoEpSet(&epSet, serverIp, 7000);
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "michael";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
epSet.eps[0].port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
tError("failed to initialize RPC");
return -1;
}
tInfo("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
taosGetTimeOfDay(&systemTime);
startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i = 0; i < appThreads; ++i) {
pInfo->index = i;
pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
tsem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
taosUsleep(1);
} while (tcount < appThreads);
taosGetTimeOfDay(&systemTime);
endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime) / 1000.0f; // mseconds
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
int ch = getchar();
UNUSED(ch);
taosCloseLog();
return 0;
}