Merge pull request #25846 from taosdata/enh/opt_prj

enh(query): optimize the project query performance
This commit is contained in:
Haojun Liao 2024-06-11 14:35:27 +08:00 committed by GitHub
commit 539543b011
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 394 additions and 175 deletions

View File

@ -2117,6 +2117,7 @@ typedef struct {
int8_t precision; int8_t precision;
int8_t compressed; int8_t compressed;
int8_t streamBlockType; int8_t streamBlockType;
int32_t payloadLen;
int32_t compLen; int32_t compLen;
int32_t numOfBlocks; int32_t numOfBlocks;
int64_t numOfRows; // from int32_t change to int64_t int64_t numOfRows; // from int32_t change to int64_t
@ -2129,6 +2130,14 @@ typedef struct {
char data[]; char data[];
} SRetrieveTableRsp; } SRetrieveTableRsp;
#define PAYLOAD_PREFIX_LEN ((sizeof(int32_t)) << 1)
#define SET_PAYLOAD_LEN(_p, _compLen, _fullLen) \
do { \
((int32_t*)(_p))[0] = (_compLen); \
((int32_t*)(_p))[1] = (_fullLen); \
} while (0);
typedef struct { typedef struct {
int64_t version; int64_t version;
int64_t numOfRows; int64_t numOfRows;
@ -2145,6 +2154,7 @@ typedef struct {
int8_t compressed; int8_t compressed;
int32_t compLen; int32_t compLen;
int32_t numOfRows; int32_t numOfRows;
int32_t fullLen;
char data[]; char data[];
} SRetrieveMetaTableRsp; } SRetrieveMetaTableRsp;
@ -2500,6 +2510,7 @@ typedef struct SSubQueryMsg {
int8_t taskType; int8_t taskType;
int8_t explain; int8_t explain;
int8_t needFetch; int8_t needFetch;
int8_t compress;
uint32_t sqlLen; uint32_t sqlLen;
char* sql; char* sql;
uint32_t msgLen; uint32_t msgLen;

View File

@ -55,6 +55,7 @@ typedef struct SDataSinkStat {
} SDataSinkStat; } SDataSinkStat;
typedef struct SDataSinkMgtCfg { typedef struct SDataSinkMgtCfg {
int8_t compress;
uint32_t maxDataBlockNum; // todo: this should be numOfRows? uint32_t maxDataBlockNum; // todo: this should be numOfRows?
uint32_t maxDataBlockNumPerQuery; uint32_t maxDataBlockNumPerQuery;
} SDataSinkMgtCfg; } SDataSinkMgtCfg;
@ -104,7 +105,7 @@ void dsReset(DataSinkHandle handle);
* @param handle * @param handle
* @param pLen data length * @param pLen data length
*/ */
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd); void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd);
/** /**
* Get data, the caller needs to allocate data memory. * Get data, the caller needs to allocate data memory.

View File

@ -139,8 +139,9 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam);
* @param qId * @param qId
* @return * @return
*/ */
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model); qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
EOPTR_EXEC_MODEL model);
/** /**
* *

View File

@ -61,6 +61,7 @@ typedef struct SQWMsgInfo {
int8_t taskType; int8_t taskType;
int8_t explain; int8_t explain;
int8_t needFetch; int8_t needFetch;
int8_t compressMsg;
} SQWMsgInfo; } SQWMsgInfo;
typedef struct SQWMsg { typedef struct SQWMsg {

View File

@ -48,16 +48,6 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb. uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary; } SQueryProfileSummary;
typedef struct STaskInfo {
SQueryNodeAddr addr;
SSubQueryMsg* msg;
} STaskInfo;
typedef struct SSchdFetchParam {
void** pData;
int32_t* code;
} SSchdFetchParam;
typedef void (*schedulerExecFp)(SExecResult* pResult, void* param, int32_t code); typedef void (*schedulerExecFp)(SExecResult* pResult, void* param, int32_t code);
typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code); typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code);
typedef bool (*schedulerChkKillFp)(void* param); typedef bool (*schedulerChkKillFp)(void* param);

View File

@ -62,6 +62,7 @@ typedef struct SRpcHandleInfo {
SRpcConnInfo conn; SRpcConnInfo conn;
int8_t forbiddenIp; int8_t forbiddenIp;
int8_t notFreeAhandle; int8_t notFreeAhandle;
int8_t compressed;
} SRpcHandleInfo; } SRpcHandleInfo;
typedef struct SRpcMsg { typedef struct SRpcMsg {

View File

@ -124,7 +124,6 @@ struct SAppInstInfo {
typedef struct SAppInfo { typedef struct SAppInfo {
int64_t startTime; int64_t startTime;
char appName[TSDB_APP_NAME_LEN]; char appName[TSDB_APP_NAME_LEN];
char* ep;
int32_t pid; int32_t pid;
int32_t numOfThreads; int32_t numOfThreads;
SHashObj* pInstMap; SHashObj* pInstMap;
@ -197,8 +196,10 @@ typedef struct SReqResultInfo {
uint64_t current; uint64_t current;
bool localResultFetched; bool localResultFetched;
bool completed; bool completed;
int32_t precision;
bool convertUcs4; bool convertUcs4;
char* decompBuf;
int32_t decompBufSize;
int32_t precision;
int32_t payloadLen; int32_t payloadLen;
char* convertJson; char* convertJson;
} SReqResultInfo; } SReqResultInfo;

View File

@ -374,6 +374,7 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
taosMemoryFreeClear(pResInfo->fields); taosMemoryFreeClear(pResInfo->fields);
taosMemoryFreeClear(pResInfo->userFields); taosMemoryFreeClear(pResInfo->userFields);
taosMemoryFreeClear(pResInfo->convertJson); taosMemoryFreeClear(pResInfo->convertJson);
taosMemoryFreeClear(pResInfo->decompBuf);
if (pResInfo->convertBuf != NULL) { if (pResInfo->convertBuf != NULL) {
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
@ -763,7 +764,6 @@ void taos_init_imp(void) {
clientConnRefPool = taosOpenRef(200, destroyTscObj); clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest); clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
// transDestroyBuffer(&conn->readBuf);
taosGetAppName(appInfo.appName, NULL); taosGetAppName(appInfo.appName, NULL);
taosThreadMutexInit(&appInfo.mutex, NULL); taosThreadMutexInit(&appInfo.mutex, NULL);

View File

@ -1711,10 +1711,8 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
} }
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
SSchedulerReq req = { SSchedulerReq req = { .syncReq = true, .pFetchRes = (void**)&pResInfo->pData };
.syncReq = true,
.pFetchRes = (void**)&pResInfo->pData,
};
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req); pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
@ -2065,6 +2063,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
tscError("setResultDataPtr paras error"); tscError("setResultDataPtr paras error");
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
if (numOfRows == 0) { if (numOfRows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2195,17 +2194,58 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
taosMemoryFreeClear(pResultInfo->pRspMsg); taosMemoryFreeClear(pResultInfo->pRspMsg);
pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htobe64(pRsp->numOfRows); pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
pResultInfo->current = 0; pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->payloadLen = htonl(pRsp->compLen);
pResultInfo->precision = pRsp->precision; pResultInfo->precision = pRsp->precision;
// decompress data if needed
int32_t payloadLen = htonl(pRsp->payloadLen);
if (pRsp->compressed) {
if (pResultInfo->decompBuf == NULL) {
pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
pResultInfo->decompBufSize = payloadLen;
} else {
if (pResultInfo->decompBufSize < payloadLen) {
char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
return terrno;
}
pResultInfo->decompBuf = p;
pResultInfo->decompBufSize = payloadLen;
}
}
}
if (payloadLen > 0) {
int32_t compLen = *(int32_t*)pRsp->data;
int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
if (pRsp->compressed && compLen < rawLen) {
int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(len == rawLen);
pResultInfo->pData = pResultInfo->decompBuf;
pResultInfo->payloadLen = rawLen;
} else {
pResultInfo->pData = pStart;
pResultInfo->payloadLen = htonl(pRsp->compLen);
ASSERT(pRsp->compLen == pRsp->payloadLen);
}
}
// TODO handle the compressed case // TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows; pResultInfo->totalRows += pResultInfo->numOfRows;
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
convertUcs4); int32_t code =
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
return code;
} }
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) { TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {

View File

@ -25,6 +25,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "tname.h" #include "tname.h"
#include "tversion.h" #include "tversion.h"
#include "command.h"
extern SClientHbMgr clientHbMgr; extern SClientHbMgr clientHbMgr;
@ -499,7 +500,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
return code; return code;
} }
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize); *pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) { if (NULL == *pRsp) {
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
@ -510,14 +511,20 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->completed = 1; (*pRsp)->completed = 1;
(*pRsp)->precision = 0; (*pRsp)->precision = 0;
(*pRsp)->compressed = 0; (*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, SHOW_VARIABLES_RESULT_COLS);
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
if (len != rspSize - sizeof(SRetrieveTableRsp)) { SET_PAYLOAD_LEN((*pRsp)->data, len, len);
int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
(*pRsp)->payloadLen = htonl(payloadLen);
(*pRsp)->compLen = htonl(payloadLen);
if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len, uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
(uint64_t)(rspSize - sizeof(SRetrieveTableRsp))); (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
@ -611,7 +618,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
return code; return code;
} }
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize); *pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) { if (NULL == *pRsp) {
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
@ -623,13 +630,20 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
(*pRsp)->precision = 0; (*pRsp)->precision = 0;
(*pRsp)->compressed = 0; (*pRsp)->compressed = 0;
(*pRsp)->compLen = 0; (*pRsp)->compLen = 0;
(*pRsp)->payloadLen = 0;
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS); (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, COMPACT_DB_RESULT_COLS);
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
if (len != rspSize - sizeof(SRetrieveTableRsp)) { SET_PAYLOAD_LEN((*pRsp)->data, len, len);
int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
(*pRsp)->payloadLen = htonl(payloadLen);
(*pRsp)->compLen = htonl(payloadLen);
if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len, uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
(uint64_t)(rspSize - sizeof(SRetrieveTableRsp))); (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;

View File

@ -821,32 +821,21 @@ TEST(clientCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = NULL;
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); // TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); // printf("error in create db, reason:%s\n", taos_errstr(pRes));
// } // }
// taos_free_result(pRes); // taos_free_result(pRes);
/*
TAOS_RES* pRes = taos_query(pConn, "alter local 'fqdn 127.0.0.1'"); TAOS_RES* pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1");
if (taos_errno(pRes) != 0) {
printf("failed to exec query, %s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1");
// pRes = taos_query(pConn, "select last(ts), ts from cache_1.no_pk_t1"); // pRes = taos_query(pConn, "select last(ts), ts from cache_1.no_pk_t1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to exec query, %s\n", taos_errstr(pRes)); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
// pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);");
// if (taos_errno(pRes) != 0) {
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using st2 tags(2)"); pRes = taos_query(pConn, "create table tu using st2 tags(2)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
@ -876,27 +865,27 @@ TEST(clientCase, projection_query_tables) {
for(int32_t j = 0; j < 1; ++j) { for(int32_t j = 0; j < 1; ++j) {
start += 20; start += 20;
for (int32_t i = 0; i < 1; ++i) { for (int32_t i = 0; i < 1; ++i) {
createNewTable(pConn, i, 100, start, pstr); createNewTable(pConn, i, 100000, 0, pstr);
} }
} }
*/
// pRes = taos_query(pConn, "select * from abc1.st2");
// pRes = taos_query(pConn, "select * from tu"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes);
// taos_free_result(pRes); ASSERT_TRUE(false);
// ASSERT_TRUE(false); }
// }
// TAOS_ROW pRow = NULL;
// TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
// char str[512] = {0};
// char str[512] = {0}; while ((pRow = taos_fetch_row(pRes)) != NULL) {
// while ((pRow = taos_fetch_row(pRes)) != NULL) { // int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); // printf("%s\n", str);
// printf("%s\n", str); }
// }
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
@ -915,25 +904,44 @@ TEST(clientCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use test"); TAOS_RES* pRes = taos_query(pConn, "explain select * from dbvg.st where tbname='ct1'");
taos_free_result(pRes); // taos_free_result(pRes);
pRes = taos_query(pConn, "select * from meters limit 50000000"); // pRes = taos_query(pConn, "select * from st2");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); // taos_free_result(pRes);
ASSERT_TRUE(false); // ASSERT_TRUE(false);
} // }
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0}; int32_t numOfRows = 0;
while ((pRow = taos_fetch_row(pRes)) != NULL) { int32_t i = 0;
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); int32_t prev = 0;
// printf("%s\n", str);
char str[512] = {0};
while (1) {
pRow = taos_fetch_row(pRes);
if (pRow == NULL) {
break;
}
i += numOfRows;
if ( (i / 1000000) > prev) {
printf("%d\n", i);
prev = i/1000000;
}
//printf("%d\n", i);
} }
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// if (i++ % 100000 == 0) {
// printf("%d\n", i);
// }
// }
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);

View File

@ -7046,6 +7046,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
if (tEncodeI8(&encoder, pReq->taskType) < 0) return -1; if (tEncodeI8(&encoder, pReq->taskType) < 0) return -1;
if (tEncodeI8(&encoder, pReq->explain) < 0) return -1; if (tEncodeI8(&encoder, pReq->explain) < 0) return -1;
if (tEncodeI8(&encoder, pReq->needFetch) < 0) return -1; if (tEncodeI8(&encoder, pReq->needFetch) < 0) return -1;
if (tEncodeI8(&encoder, pReq->compress) < 0) return -1;
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1;
if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1; if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1;
@ -7086,6 +7087,7 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq)
if (tDecodeI8(&decoder, &pReq->taskType) < 0) return -1; if (tDecodeI8(&decoder, &pReq->taskType) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->explain) < 0) return -1; if (tDecodeI8(&decoder, &pReq->explain) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->needFetch) < 0) return -1; if (tDecodeI8(&decoder, &pReq->needFetch) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->compress) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1; if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1; if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1;

View File

@ -1148,7 +1148,6 @@ _OVER:
} }
tFreeSShowVariablesRsp(&rsp); tFreeSShowVariablesRsp(&rsp);
return code; return code;
} }

View File

@ -4257,9 +4257,6 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
blockDataEnsureCapacity(pResBlock, capacity); blockDataEnsureCapacity(pResBlock, capacity);
} }
// for debug purpose
// capacity = 7;
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;

View File

@ -26,7 +26,7 @@
extern SConfig* tsCfg; extern SConfig* tsCfg;
static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRetrieveTableRsp** pRsp) { static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRetrieveTableRsp** pRsp) {
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize); *pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) { if (NULL == *pRsp) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -36,11 +36,16 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->completed = 1; (*pRsp)->completed = 1;
(*pRsp)->precision = 0; (*pRsp)->precision = 0;
(*pRsp)->compressed = 0; (*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols); (*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols); int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, numOfCols);
SET_PAYLOAD_LEN((*pRsp)->data, len, len);
int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
(*pRsp)->payloadLen = htonl(payloadLen);
(*pRsp)->compLen = htonl(payloadLen);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -70,7 +70,6 @@ char* qExplainGetTimerangeTargetStr(int32_t target) {
return targetName[target]; return targetName[target];
} }
void qExplainFreeResNode(SExplainResNode *resNode) { void qExplainFreeResNode(SExplainResNode *resNode) {
if (NULL == resNode) { if (NULL == resNode) {
return; return;
@ -1942,7 +1941,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
pBlock->info.rows = rowNum; pBlock->info.rows = rowNum;
int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) { if (NULL == rsp) {
@ -1954,9 +1953,13 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = htobe64((int64_t)rowNum); rsp->numOfRows = htobe64((int64_t)rowNum);
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, taosArrayGetSize(pBlock->pDataBlock));
rsp->compLen = htonl(len); rsp->compLen = htonl(len);
rsp->payloadLen = htonl(len);
rsp->compressed = 0;
SET_PAYLOAD_LEN(rsp->data, len, len);
blockDataDestroy(pBlock); blockDataDestroy(pBlock);

View File

@ -25,7 +25,6 @@ extern "C" {
#include "storageapi.h" #include "storageapi.h"
#include "tcommon.h" #include "tcommon.h"
struct SDataSink;
struct SDataSinkHandle; struct SDataSinkHandle;
typedef struct SDataSinkManager { typedef struct SDataSinkManager {
@ -36,7 +35,7 @@ typedef struct SDataSinkManager {
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
typedef void (*FReset)(struct SDataSinkHandle* pHandle); typedef void (*FReset)(struct SDataSinkHandle* pHandle);
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size); typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size);

View File

@ -154,7 +154,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
taosThreadMutexUnlock(&pDeleter->mutex); taosThreadMutexUnlock(&pDeleter->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
if (taosQueueEmpty(pDeleter->pDataBlocks)) { if (taosQueueEmpty(pDeleter->pDataBlocks)) {
*pQueryEnd = pDeleter->queryEnd; *pQueryEnd = pDeleter->queryEnd;
@ -171,6 +171,8 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData; SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
*pLen = pEntry->dataLen; *pLen = pEntry->dataLen;
*pRawLen = pEntry->dataLen;
*pQueryEnd = pDeleter->queryEnd; *pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
@ -186,6 +188,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->queryEnd = pDeleter->queryEnd; pOutput->queryEnd = pDeleter->queryEnd;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData); SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pDeleter->pParam->pUidList = NULL; pDeleter->pParam->pUidList = NULL;

View File

@ -31,6 +31,7 @@ typedef struct SDataDispatchBuf {
} SDataDispatchBuf; } SDataDispatchBuf;
typedef struct SDataCacheEntry { typedef struct SDataCacheEntry {
int32_t rawLen;
int32_t dataLen; int32_t dataLen;
int32_t numOfRows; int32_t numOfRows;
int32_t numOfCols; int32_t numOfCols;
@ -48,6 +49,8 @@ typedef struct SDataDispatchHandle {
bool queryEnd; bool queryEnd;
uint64_t useconds; uint64_t useconds;
uint64_t cachedSize; uint64_t cachedSize;
void* pCompressBuf;
int32_t bufSize;
TdThreadMutex mutex; TdThreadMutex mutex;
} SDataDispatchHandle; } SDataDispatchHandle;
@ -63,22 +66,60 @@ typedef struct SDataDispatchHandle {
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
int32_t numOfCols = 0; int32_t numOfCols = 0;
SNode* pNode; SNode* pNode;
FOREACH(pNode, pHandle->pSchema->pSlots) { FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) { if (pSlotDesc->output) {
++numOfCols; ++numOfCols;
} }
} }
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = 0; pEntry->compressed = 0;
pEntry->numOfRows = pInput->pData->info.rows; pEntry->numOfRows = pInput->pData->info.rows;
pEntry->numOfCols = numOfCols; pEntry->numOfCols = numOfCols;
pEntry->dataLen = 0; pEntry->dataLen = 0;
pEntry->rawLen = 0;
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); {
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
if (pHandle->pCompressBuf == NULL) {
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
pHandle->pCompressBuf = taosMemoryMalloc(pBuf->allocSize + 8);
pHandle->bufSize = pBuf->allocSize + 8;
} else {
if (pHandle->bufSize < pBuf->allocSize + 8) {
pHandle->bufSize = pBuf->allocSize + 8;
void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
if (p != NULL) {
pHandle->pCompressBuf = p;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("failed to prepare compress buf:%d, code: out of memory", pHandle->bufSize);
return;
}
}
}
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, numOfCols);
int32_t len = tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
if (len < dataLen) {
pEntry->compressed = 1;
pEntry->dataLen = len;
pEntry->rawLen = dataLen;
} else { // no need to compress data
pEntry->compressed = 0;
pEntry->dataLen = dataLen;
pEntry->rawLen = dataLen;
memcpy(pEntry->data, pHandle->pCompressBuf, dataLen);
}
} else {
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
pEntry->rawLen = pEntry->dataLen;
}
}
pBuf->useSize += pEntry->dataLen; pBuf->useSize += pEntry->dataLen;
@ -163,7 +204,7 @@ static void resetDispatcher(struct SDataSinkHandle* pHandle) {
taosThreadMutexUnlock(&pDispatcher->mutex); taosThreadMutexUnlock(&pDispatcher->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) { if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
*pQueryEnd = pDispatcher->queryEnd; *pQueryEnd = pDispatcher->queryEnd;
@ -180,9 +221,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData; SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
*pLen = pEntry->dataLen; *pLen = pEntry->dataLen;
*pRowLen = pEntry->rawLen;
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
*pQueryEnd = pDispatcher->queryEnd; *pQueryEnd = pDispatcher->queryEnd;
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
@ -200,6 +239,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->queryEnd = pDispatcher->queryEnd; pOutput->queryEnd = pDispatcher->queryEnd;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfRows = pEntry->numOfRows;
@ -224,6 +264,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
taosMemoryFreeClear(pDispatcher->nextOutput.pData); taosMemoryFreeClear(pDispatcher->nextOutput.pData);
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL; SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
@ -232,7 +273,11 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
} }
} }
taosCloseQueue(pDispatcher->pDataBlocks); taosCloseQueue(pDispatcher->pDataBlocks);
taosMemoryFreeClear(pDispatcher->pCompressBuf);
pDispatcher->bufSize = 0;
taosThreadMutexDestroy(&pDispatcher->mutex); taosThreadMutexDestroy(&pDispatcher->mutex);
taosMemoryFree(pDispatcher->pManager); taosMemoryFree(pDispatcher->pManager);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -251,6 +296,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _return; goto _return;
} }
dispatcher->sink.fPut = putDataBlock; dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fEndPut = endPut; dispatcher->sink.fEndPut = endPut;
dispatcher->sink.fReset = resetDispatcher; dispatcher->sink.fReset = resetDispatcher;
@ -258,22 +304,24 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->sink.fGetCacheSize = getCacheSize; dispatcher->sink.fGetCacheSize = getCacheSize;
dispatcher->pManager = pManager; dispatcher->pManager = pManager;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
dispatcher->status = DS_BUF_EMPTY; dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false; dispatcher->queryEnd = false;
dispatcher->pDataBlocks = taosOpenQueue(); dispatcher->pDataBlocks = taosOpenQueue();
taosThreadMutexInit(&dispatcher->mutex, NULL); taosThreadMutexInit(&dispatcher->mutex, NULL);
if (NULL == dispatcher->pDataBlocks) { if (NULL == dispatcher->pDataBlocks) {
taosMemoryFree(dispatcher); taosMemoryFree(dispatcher);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _return; goto _return;
} }
*pHandle = dispatcher; *pHandle = dispatcher;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFree(pManager); taosMemoryFree(pManager);
return terrno; return terrno;
} }

View File

@ -370,7 +370,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
taosThreadMutexUnlock(&pInserter->mutex); taosThreadMutexUnlock(&pInserter->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle; SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
*pLen = pDispatcher->submitRes.affectedRows; *pLen = pDispatcher->submitRes.affectedRows;
qDebug("got total affectedRows %" PRId64, *pLen); qDebug("got total affectedRows %" PRId64, *pLen);

View File

@ -18,13 +18,14 @@
#include "planner.h" #include "planner.h"
#include "tarray.h" #include "tarray.h"
SDataSinkStat gDataSinkStat = {0}; SDataSinkStat gDataSinkStat = {0};
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) { int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) {
SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager)); SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager));
if (NULL == pSinkManager) { if (NULL == pSinkManager) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pSinkManager->cfg = *cfg; pSinkManager->cfg = *cfg;
pSinkManager->pAPI = pAPI; pSinkManager->pAPI = pAPI;
@ -75,9 +76,9 @@ void dsReset(DataSinkHandle handle) {
} }
} }
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) { void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd); pHandleImpl->fGetLen(pHandleImpl, pLen, pRawLen, pQueryEnd);
} }
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {

View File

@ -41,6 +41,8 @@ typedef struct SSourceDataInfo {
SArray* pSrcUidList; SArray* pSrcUidList;
int32_t srcOpType; int32_t srcOpType;
bool tableSeq; bool tableSeq;
char* decompBuf;
int32_t decompBufSize;
} SSourceDataInfo; } SSourceDataInfo;
static void destroyExchangeOperatorInfo(void* param); static void destroyExchangeOperatorInfo(void* param);
@ -371,7 +373,10 @@ void freeBlock(void* pParam) {
void freeSourceDataInfo(void* p) { void freeSourceDataInfo(void* p) {
SSourceDataInfo* pInfo = (SSourceDataInfo*)p; SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
taosMemoryFreeClear(pInfo->decompBuf);
taosMemoryFreeClear(pInfo->pRsp); taosMemoryFreeClear(pInfo->pRsp);
pInfo->decompBufSize = 0;
} }
void doDestroyExchangeOperatorInfo(void* param) { void doDestroyExchangeOperatorInfo(void* param) {
@ -411,6 +416,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htobe64(pRsp->numOfRows); pRsp->numOfRows = htobe64(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen); pRsp->compLen = htonl(pRsp->compLen);
pRsp->payloadLen = htonl(pRsp->payloadLen);
pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
@ -673,11 +679,32 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) { int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
char* pStart = pRetrieveRsp->data; char* pNextStart = pRetrieveRsp->data;
char* pStart = pNextStart;
int32_t index = 0; int32_t index = 0;
int32_t code = 0; int32_t code = 0;
if (pRetrieveRsp->compressed) { // decompress the data
if (pDataInfo->decompBuf == NULL) {
pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
} else {
if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
if (p != NULL) {
pDataInfo->decompBuf = p;
pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
}
}
}
}
while (index++ < pRetrieveRsp->numOfBlocks) { while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = NULL; SSDataBlock* pb = NULL;
pStart = pNextStart;
if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) { if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks); pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
blockDataCleanup(pb); blockDataCleanup(pb);
@ -685,6 +712,20 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
} }
int32_t compLen = *(int32_t*) pStart;
pStart += sizeof(int32_t);
int32_t rawLen = *(int32_t*) pStart;
pStart += sizeof(int32_t);
ASSERT(compLen <= rawLen && compLen != 0);
pNextStart = pStart + compLen;
if (pRetrieveRsp->compressed && (compLen < rawLen)) {
int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(t == rawLen);
pStart = pDataInfo->decompBuf;
}
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) { if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);

View File

@ -286,7 +286,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE); code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan); nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo); qDestroyTask(pTaskInfo);
@ -322,7 +322,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan); nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo); qDestroyTask(pTaskInfo);
@ -524,7 +524,8 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
} }
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) { qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
EOPTR_EXEC_MODEL model) {
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool); taosThreadOnce(&initPoolOnce, initRefPool);
@ -537,7 +538,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
} }
if (handle) { if (handle) {
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
void* pSinkManager = NULL; void* pSinkManager = NULL;
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager); code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -35,7 +35,7 @@ extern "C" {
#define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_DEFAULT_HEARTBEAT_MSEC 5000
#define QW_SCH_TIMEOUT_MSEC 180000 #define QW_SCH_TIMEOUT_MSEC 180000
#define QW_MIN_RES_ROWS 4096 #define QW_MIN_RES_ROWS 16384
enum { enum {
QW_PHASE_PRE_QUERY = 1, QW_PHASE_PRE_QUERY = 1,

View File

@ -38,7 +38,7 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
int32_t code); int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList);

View File

@ -29,13 +29,14 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **r
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) { void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
rsp->useconds = htobe64(input->useconds); rsp->useconds = htobe64(input->useconds);
rsp->completed = qComplete; rsp->completed = qComplete;
rsp->precision = input->precision; rsp->precision = input->precision;
rsp->compressed = input->compressed; rsp->compressed = input->compressed;
rsp->payloadLen = htonl(rawDataLen);
rsp->compLen = htonl(len); rsp->compLen = htonl(len);
rsp->numOfRows = htobe64(input->numOfRows); rsp->numOfRows = htobe64(input->numOfRows);
rsp->numOfCols = htonl(input->numOfCols); rsp->numOfCols = htonl(input->numOfCols);
@ -154,6 +155,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
.info = *pConn, .info = *pConn,
}; };
rpcRsp.info.compressed = pRsp->compressed;
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -452,13 +454,15 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
qwMsg.msgInfo.explain = msg.explain; qwMsg.msgInfo.explain = msg.explain;
qwMsg.msgInfo.taskType = msg.taskType; qwMsg.msgInfo.taskType = msg.taskType;
qwMsg.msgInfo.needFetch = msg.needFetch; qwMsg.msgInfo.needFetch = msg.needFetch;
qwMsg.msgInfo.compressMsg = msg.compress;
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, compress:%d, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
msg.compress, pMsg->info.handle, msg.sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
pMsg->info.handle, msg.sql);
code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql); code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql);
msg.sql = NULL; msg.sql = NULL;
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code);
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code);
tFreeSSubQueryMsg(&msg); tFreeSSubQueryMsg(&msg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -102,7 +102,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
} }
if (!ctx->needFetch) { if (!ctx->needFetch) {
dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL); dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL, NULL);
} }
} }
@ -285,9 +285,11 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg,
SOutputData *pOutput) {
int64_t len = 0; int64_t len = 0;
SRetrieveTableRsp *rsp = NULL; int64_t rawLen = 0;
SRetrieveTableRsp *pRsp = NULL;
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
SOutputData output = {0}; SOutputData output = {0};
@ -298,9 +300,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
} }
*dataLen = 0; *dataLen = 0;
*pRawDataLen = 0;
while (true) { while (true) {
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
if (len < 0) { if (len < 0) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len); QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len);
@ -322,31 +325,36 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask);
} }
if (NULL == rsp) { if (NULL == pRsp) {
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp));
*pOutput = output; *pOutput = output;
} else { } else {
pOutput->queryEnd = output.queryEnd; pOutput->queryEnd = output.queryEnd;
pOutput->bufStatus = output.bufStatus; pOutput->bufStatus = output.bufStatus;
pOutput->useconds = output.useconds; pOutput->useconds = output.useconds;
} }
break; break;
} }
pOutput->bufStatus = DS_BUF_EMPTY; pOutput->bufStatus = DS_BUF_EMPTY;
break; break;
} }
// Got data from sink // Got data from sink
QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len); QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len);
*dataLen += len; *dataLen += len + PAYLOAD_PREFIX_LEN;
*pRawDataLen += rawLen + PAYLOAD_PREFIX_LEN;
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp)); QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp));
// set the serialize start position
output.pData = pRsp->data + *dataLen - (len + PAYLOAD_PREFIX_LEN);
((int32_t*) output.pData)[0] = len;
((int32_t*) output.pData)[1] = rawLen;
output.pData += sizeof(int32_t) * 2;
output.pData = rsp->data + *dataLen - len;
code = dsGetDataBlock(ctx->sinkHandle, &output); code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
@ -357,7 +365,11 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
pOutput->precision = output.precision; pOutput->precision = output.precision;
pOutput->bufStatus = output.bufStatus; pOutput->bufStatus = output.bufStatus;
pOutput->useconds = output.useconds; pOutput->useconds = output.useconds;
pOutput->compressed = output.compressed;
if (output.compressed) {
pOutput->compressed = output.compressed;
}
pOutput->numOfCols = output.numOfCols; pOutput->numOfCols = output.numOfCols;
pOutput->numOfRows += output.numOfRows; pOutput->numOfRows += output.numOfRows;
pOutput->numOfBlocks++; pOutput->numOfBlocks++;
@ -382,18 +394,18 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
} }
} }
*rspMsg = rsp; *rspMsg = pRsp;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) { int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
int64_t len = 0; int64_t len = 0;
int64_t rawLen = 0;
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
SOutputData output = {0}; SOutputData output = {0};
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
if (len <= 0 || len != sizeof(SDeleterRes)) { if (len <= 0 || len != sizeof(SDeleterRes)) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len); QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
@ -433,9 +445,10 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
int32_t rawLen = 0;
SOutputData sOutput = {0}; SOutputData sOutput = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput); code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput);
} }
if (NULL == rsp && TSDB_CODE_SUCCESS == code) { if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
@ -445,7 +458,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
if (NULL != rsp) { if (NULL != rsp) {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
if (qComplete) { if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryEnd, true);
} }
@ -711,7 +724,6 @@ _return:
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false;
SSubplan *plan = NULL; SSubplan *plan = NULL;
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
@ -728,8 +740,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->queryMsgType = qwMsg->msgType; ctx->queryMsgType = qwMsg->msgType;
ctx->localExec = false; ctx->localExec = false;
// QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
@ -737,14 +747,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
#if 0 code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
SReadHandle* pReadHandle = qwMsg->node;
int64_t delay = 0;
bool fhFinish = false;
pReadHandle->api.tqReaderFn.tqGetStreamExecProgress(pReadHandle->vnode, 0, &delay, &fhFinish);
#endif
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
sql = NULL; sql = NULL;
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
@ -756,8 +759,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_APP_ERROR); QW_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
//qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);
ctx->level = plan->level; ctx->level = plan->level;
ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo); ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo);
atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
@ -791,6 +792,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
int32_t rawLen = 0;
bool queryStop = false; bool queryStop = false;
bool qComplete = false; bool qComplete = false;
@ -810,7 +812,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0}; SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput));
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus)); QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
@ -821,7 +823,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (rsp) { if (rsp) {
qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
if (qComplete) { if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryEnd, true);
atomic_store_8((int8_t *)&ctx->queryContinue, 0); atomic_store_8((int8_t *)&ctx->queryContinue, 0);
@ -878,6 +880,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = 0;
int32_t dataLen = 0; int32_t dataLen = 0;
int32_t rawDataLen = 0;
bool locked = false; bool locked = false;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
void *rsp = NULL; void *rsp = NULL;
@ -896,14 +900,14 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
SOutputData sOutput = {0}; SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawDataLen, &rsp, &sOutput));
if (NULL == rsp) { if (NULL == rsp) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
} else { } else {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete);
if (qComplete) { if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryEnd, true);
} }
@ -922,7 +926,6 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1); atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1); atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
@ -950,6 +953,7 @@ _return:
qwDbgSimulateRedirect(qwMsg, ctx, &rsped); qwDbgSimulateRedirect(qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
} }
if (!rsped) { if (!rsped) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
@ -1098,7 +1102,6 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return: _return:
memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) { if (code) {
@ -1125,7 +1128,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
} }
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
SQWHbInfo *rspList = NULL; SQWHbInfo *rspList = NULL;
SArray *pExpiredSch = NULL; SArray *pExpiredSch = NULL;
int32_t code = 0; int32_t code = 0;
@ -1158,8 +1160,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
return; return;
} }
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0; int32_t i = 0;
int64_t currentMs = taosGetTimestampMs(); int64_t currentMs = taosGetTimestampMs();
@ -1225,7 +1225,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
@ -1427,7 +1427,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
@ -1465,6 +1465,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
SQWorker *mgmt = (SQWorker *)pMgmt; SQWorker *mgmt = (SQWorker *)pMgmt;
int32_t code = 0; int32_t code = 0;
int32_t dataLen = 0; int32_t dataLen = 0;
int32_t rawLen = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
void *rsp = NULL; void *rsp = NULL;
bool queryStop = false; bool queryStop = false;
@ -1472,7 +1473,6 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->fetchMsgType = TDMT_SCH_MERGE_FETCH; ctx->fetchMsgType = TDMT_SCH_MERGE_FETCH;
@ -1481,7 +1481,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
SOutputData sOutput = {0}; SOutputData sOutput = {0};
while (true) { while (true) {
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput));
if (NULL == rsp) { if (NULL == rsp) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
@ -1490,7 +1490,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
} else { } else {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
if (qComplete) { if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryEnd, true);
} }

View File

@ -20,6 +20,9 @@
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tglobal.h"
#include "tmisce.h"
// clang-format off // clang-format off
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = pTask->lastMsgType; int32_t lastMsgType = pTask->lastMsgType;
@ -1127,6 +1130,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
qMsg.msgLen = pTask->msgLen; qMsg.msgLen = pTask->msgLen;
qMsg.msg = pTask->msg; qMsg.msg = pTask->msg;
if (strcmp(tsLocalFqdn, GET_ACTIVE_EP(&addr->epSet)->fqdn) == 0) {
qMsg.compress = 0;
} else {
qMsg.compress = 1;
}
msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg); msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg);
if (msgSize < 0) { if (msgSize < 0) {
SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize); SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize);

View File

@ -33,11 +33,26 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe
} }
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
for (int32_t i = 0; i < blockNum; i++) { for (int32_t i = 0; i < blockNum; i++) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockDecode(pDataBlock, pRetrieve->data);
int32_t compLen = *(int32_t*)pRetrieve->data;
int32_t fullLen = *(int32_t*)(pRetrieve->data + sizeof(int32_t));
char* pInput = pRetrieve->data + PAYLOAD_PREFIX_LEN;
if (pRetrieve->compressed && compLen < fullLen) {
char* p = taosMemoryMalloc(fullLen);
int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(len == fullLen);
pInput = p;
}
blockDecode(pDataBlock, pInput);
if (pRetrieve->compressed && compLen < fullLen) {
taosMemoryFree(pInput);
}
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
@ -98,7 +113,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
taosArrayPush(pArray, &(SSDataBlock){0}); taosArrayPush(pArray, &(SSDataBlock){0});
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockDecode(pDataBlock, pRetrieve->data);
blockDecode(pDataBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN);
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.skey = be64toh(pRetrieve->skey);

View File

@ -124,11 +124,12 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
} }
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); SRetrieveTableRsp* pRetrieve = NULL;
SRetrieveTableRsp* pRetrieve = taosMemoryCalloc(1, dataStrLen);
if (pRetrieve == NULL) { int32_t len = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
return TSDB_CODE_OUT_OF_MEMORY;
} pRetrieve = taosMemoryCalloc(1, len);
if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->useconds = 0; pRetrieve->useconds = 0;
@ -142,13 +143,19 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version); pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data+ PAYLOAD_PREFIX_LEN, numOfCols);
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
pRetrieve->payloadLen = htonl(payloadLen);
pRetrieve->compLen = htonl(payloadLen);
pRetrieve->compressed = 0;
req->streamId = pTask->id.streamId; req->streamId = pTask->id.streamId;
req->srcNodeId = pTask->info.nodeId; req->srcNodeId = pTask->info.nodeId;
req->srcTaskId = pTask->id.taskId; req->srcTaskId = pTask->id.taskId;
req->pRetrieve = pRetrieve; req->pRetrieve = pRetrieve;
req->retrieveLen = dataStrLen; req->retrieveLen = len;
return 0; return 0;
} }
@ -807,7 +814,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
} }
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
ASSERT(dataStrLen > 0); ASSERT(dataStrLen > 0);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
@ -829,10 +836,16 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
actualLen += sizeof(SRetrieveTableRsp); SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen); int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
pRetrieve->payloadLen = htonl(payloadLen);
pRetrieve->compLen = htonl(payloadLen);
payloadLen += sizeof(SRetrieveTableRsp);
taosArrayPush(pReq->dataLen, &payloadLen);
taosArrayPush(pReq->data, &buf); taosArrayPush(pReq->data, &buf);
pReq->totalLen += dataStrLen; pReq->totalLen += dataStrLen;

View File

@ -76,6 +76,9 @@ typedef struct SCliConn {
SDelayTask* task; SDelayTask* task;
uint32_t clientIp;
uint32_t serverIp;
char* dstAddr; char* dstAddr;
char src[32]; char src[32];
char dst[32]; char dst[32];
@ -1093,7 +1096,7 @@ void cliSendBatch(SCliConn* pConn) {
} }
pHead->timestamp = taosHton64(taosGetTimestampUs()); pHead->timestamp = taosHton64(taosGetTimestampUs());
if (pHead->comp == 0) { if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
@ -1171,7 +1174,7 @@ void cliSend(SCliConn* pConn) {
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
} }
if (pHead->comp == 0) { if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
@ -1403,6 +1406,12 @@ void cliConnCb(uv_connect_t* req, int status) {
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
transSockInfo2Str(&sockname, pConn->src); transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
struct sockaddr_in saddr = *(struct sockaddr_in*)&peername;
pConn->clientIp = addr.sin_addr.s_addr;
pConn->serverIp = saddr.sin_addr.s_addr;
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
if (pConn->pBatch != NULL) { if (pConn->pBatch != NULL) {
cliSendBatch(pConn); cliSendBatch(pConn);

View File

@ -623,7 +623,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
int32_t len = transMsgLenFromCont(pMsg->contLen); int32_t len = transMsgLenFromCont(pMsg->contLen);
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { if (pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp && pTransInst->compressSize != -1 &&
pTransInst->compressSize < pMsg->contLen) {
len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)len); pHead->msgLen = (int32_t)htonl((uint32_t)len);
} }