fix(query): check datablock should check output flag

This commit is contained in:
Liu Jicong 2022-06-28 13:59:49 +08:00
parent 383e7a495b
commit 58803fafcf
16 changed files with 230 additions and 239 deletions

View File

@ -138,7 +138,7 @@ int32_t create_topic() {
taos_free_result(pRes); taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/ /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3,t1 from st1 where t1 = 2000"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1 where t1 = 2000");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;

View File

@ -224,7 +224,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createDataBlock(); SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
@ -232,9 +232,8 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(SSDataBlock* pBlock, int32_t index); SColumnInfoData* bdGetColumnInfoData(SSDataBlock* pBlock, int32_t index);
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
int8_t needCompress); const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug // for debug

View File

@ -623,6 +623,7 @@ typedef struct {
col_id_t colId; col_id_t colId;
int16_t slotId; int16_t slotId;
}; };
bool output; // TODO remove it later
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;

View File

@ -67,11 +67,11 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
dstEpSet.eps[dstEpSet.inUse].fqdn); dstEpSet.eps[dstEpSet.inUse].fqdn);
} else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse]; SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse]; SEp* pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port, pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn, pNewEp->port); pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
} }
@ -308,13 +308,13 @@ static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
blockDataEnsureCapacity(pBlock, numOfCfg); blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SVariablesInfo *pInfo = taosArrayGet(pVars, i); SVariablesInfo* pInfo = taosArrayGet(pVars, i);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false); colDataAppend(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++); pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
@ -324,14 +324,13 @@ static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
pBlock->info.rows = numOfCfg; pBlock->info.rows = numOfCfg;
*block = pBlock; *block = pBlock;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
int32_t code = buildShowVariablesBlock(pVars, &pBlock); int32_t code = buildShowVariablesBlock(pVars, &pBlock);
if (code) { if (code) {
return code; return code;
} }
@ -351,7 +350,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = 0; int32_t len = 0;
blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false); blockEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
@ -363,7 +362,7 @@ int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code)
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code); setErrno(pRequest, code);
} else { } else {
SShowVariablesRsp rsp = {0}; SShowVariablesRsp rsp = {0};
SRetrieveTableRsp* pRes = NULL; SRetrieveTableRsp* pRes = NULL;
code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp); code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -372,7 +371,7 @@ int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code)
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false); code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false);
} }
tFreeSShowVariablesRsp(&rsp); tFreeSShowVariablesRsp(&rsp);
} }
@ -384,7 +383,6 @@ int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code)
return code; return code;
} }
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
switch (msgType) { switch (msgType) {
case TDMT_MND_CONNECT: case TDMT_MND_CONNECT:

View File

@ -1613,7 +1613,8 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId); printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type,
pDataBlock->info.childId, pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag); printf("%s |", flag);
for (int32_t k = 0; k < numOfCols; k++) { for (int32_t k = 0; k < numOfCols; k++) {
@ -1902,8 +1903,7 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
return rname.childTableName; return rname.childTableName;
} }
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) {
int8_t needCompress) {
// todo extract method // todo extract method
int32_t* actualLen = (int32_t*)data; int32_t* actualLen = (int32_t*)data;
data += sizeof(int32_t); data += sizeof(int32_t);
@ -1913,6 +1913,7 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
*((int16_t*)data) = pColInfoData->info.type; *((int16_t*)data) = pColInfoData->info.type;
data += sizeof(int16_t); data += sizeof(int16_t);
@ -1960,7 +1961,7 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
*groupId = pBlock->info.groupId; *groupId = pBlock->info.groupId;
} }
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) { const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
const char* pStart = pData; const char* pStart = pData;
int32_t dataLen = *(int32_t*)pStart; int32_t dataLen = *(int32_t*)pStart;

View File

@ -17,7 +17,6 @@
#include "dmInt.h" #include "dmInt.h"
#include "systable.h" #include "systable.h"
extern SConfig *tsCfg; extern SConfig *tsCfg;
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
@ -83,7 +82,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(*pMgmt->getVnodeLoadsFp)(&vinfo); (*pMgmt->getVnodeLoadsFp)(&vinfo);
req.pVloads = vinfo.pVloads; req.pVloads = vinfo.pVloads;
SMonMloadInfo minfo = {0}; SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo); (*pMgmt->getMnodeLoadsFp)(&minfo);
req.mload = minfo.load; req.mload = minfo.load;
@ -186,10 +185,10 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0; return 0;
} }
SSDataBlock* dmBuildVariablesBlock(void) { SSDataBlock *dmBuildVariablesBlock(void) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0; size_t size = 0;
const SSysTableMeta* pMeta = NULL; const SSysTableMeta *pMeta = NULL;
getInfosDbMeta(&pMeta, &size); getInfosDbMeta(&pMeta, &size);
int32_t index = 0; int32_t index = 0;
@ -215,7 +214,7 @@ SSDataBlock* dmBuildVariablesBlock(void) {
return pBlock; return pBlock;
} }
int32_t dmAppendVariablesToBlock(SSDataBlock* pBlock, int32_t dnodeId) { int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array); int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0; int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg); blockDataEnsureCapacity(pBlock, numOfCfg);
@ -230,8 +229,8 @@ int32_t dmAppendVariablesToBlock(SSDataBlock* pBlock, int32_t dnodeId) {
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++); pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false); colDataAppend(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0; int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen); cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen); varDataSetLen(value, valueLen);
@ -241,9 +240,8 @@ int32_t dmAppendVariablesToBlock(SSDataBlock* pBlock, int32_t dnodeId) {
numOfRows++; numOfRows++;
} }
pBlock->info.rows = numOfRows; pBlock->info.rows = numOfRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -267,7 +265,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
SSDataBlock* pBlock = dmBuildVariablesBlock(); SSDataBlock *pBlock = dmBuildVariablesBlock();
dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId); dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
@ -283,14 +281,14 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
char *pStart = pRsp->data; char *pStart = pRsp->data;
*(int32_t *)pStart = htonl(numOfCols); *(int32_t *)pStart = htonl(numOfCols);
pStart += sizeof(int32_t); // number of columns pStart += sizeof(int32_t); // number of columns
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SSysTableSchema *pSchema = (SSysTableSchema *)pStart; SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
pSchema->bytes = htonl(pColInfo->info.bytes); pSchema->bytes = htonl(pColInfo->info.bytes);
pSchema->colId = htons(pColInfo->info.colId); pSchema->colId = htons(pColInfo->info.colId);
pSchema->type = pColInfo->info.type; pSchema->type = pColInfo->info.type;
@ -299,7 +297,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t len = 0; int32_t len = 0;
blockCompressEncode(pBlock, pStart, &len, numOfCols, false); blockEncode(pBlock, pStart, &len, numOfCols, false);
pRsp->numOfRows = htonl(pBlock->info.rows); pRsp->numOfRows = htonl(pBlock->info.rows);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision

View File

@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndShow.h" #include "mndShow.h"
#include "systable.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "systable.h"
#define SHOW_STEP_SIZE 100 #define SHOW_STEP_SIZE 100
@ -307,7 +307,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
} }
int32_t len = 0; int32_t len = 0;
blockCompressEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false); blockEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false);
} }
pRsp->numOfRows = htonl(rowsRead); pRsp->numOfRows = htonl(rowsRead);

View File

@ -29,7 +29,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRs
// TODO enable compress // TODO enable compress
int32_t actualLen = 0; int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, taosArrayGetSize(pBlock->pDataBlock), false); blockEncode(pBlock, pRetrieve->data, &actualLen, taosArrayGetSize(pBlock->pDataBlock), false);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockDataLen, &actualLen);

View File

@ -18,8 +18,8 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
extern SConfig *tsCfg; extern SConfig* tsCfg;
static int32_t getSchemaBytes(const SSchema* pSchema) { static int32_t getSchemaBytes(const SSchema* pSchema) {
switch (pSchema->type) { switch (pSchema->type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
return (pSchema->bytes - VARSTR_HEADER_SIZE); return (pSchema->bytes - VARSTR_HEADER_SIZE);
@ -104,7 +104,7 @@ static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfCols = htonl(DESCRIBE_RESULT_COLS); (*pRsp)->numOfCols = htonl(DESCRIBE_RESULT_COLS);
int32_t len = 0; int32_t len = 0;
blockCompressEncode(pBlock, (*pRsp)->data, &len, DESCRIBE_RESULT_COLS, false); blockEncode(pBlock, (*pRsp)->data, &len, DESCRIBE_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
@ -113,9 +113,8 @@ static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) {
static int32_t execResetQueryCache() { return catalogClearCache(); } static int32_t execResetQueryCache() { return catalogClearCache(); }
static SSDataBlock* buildCreateDBResultDataBlock() { static SSDataBlock* buildCreateDBResultDataBlock() {
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = createDataBlock();
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_COLS, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_COLS, 1);
blockDataAppendColInfo(pBlock, &infoData); blockDataAppendColInfo(pBlock, &infoData);
@ -149,14 +148,14 @@ int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
return v; return v;
} }
char *buildRetension(SArray *pRetension) { char* buildRetension(SArray* pRetension) {
size_t size = taosArrayGetSize(pRetension); size_t size = taosArrayGetSize(pRetension);
if (size == 0) { if (size == 0) {
return NULL; return NULL;
} }
char *p1 = taosMemoryCalloc(1, 100); char* p1 = taosMemoryCalloc(1, 100);
SRetention *p = taosArrayGet(pRetension, 0); SRetention* p = taosArrayGet(pRetension, 0);
int32_t len = 0; int32_t len = 0;
@ -185,8 +184,7 @@ char *buildRetension(SArray *pRetension) {
return p1; return p1;
} }
static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, SDbCfgInfo* pCfg) {
static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char *dbFName, SDbCfgInfo* pCfg) {
blockDataEnsureCapacity(pBlock, 1); blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1; pBlock->info.rows = 1;
@ -198,7 +196,7 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char *dbFName, S
SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1); SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1);
char buf2[SHOW_CREATE_DB_RESULT_FIELD2_LEN] = {0}; char buf2[SHOW_CREATE_DB_RESULT_FIELD2_LEN] = {0};
int32_t len = 0; int32_t len = 0;
char *prec = NULL; char* prec = NULL;
switch (pCfg->precision) { switch (pCfg->precision) {
case TSDB_TIME_PRECISION_MILLI: case TSDB_TIME_PRECISION_MILLI:
prec = TSDB_TIME_PRECISION_MILLI_STR; prec = TSDB_TIME_PRECISION_MILLI_STR;
@ -214,15 +212,16 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char *dbFName, S
break; break;
} }
char *retentions = buildRetension(pCfg->pRetensions); char* retentions = buildRetension(pCfg->pRetensions);
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s` BUFFER %d CACHELAST %d COMP %d DURATION %dm " len += sprintf(buf2 + VARSTR_HEADER_SIZE,
"FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " "CREATE DATABASE `%s` BUFFER %d CACHELAST %d COMP %d DURATION %dm "
"STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d", "FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
dbFName, pCfg->buffer, pCfg->cacheLastRow, pCfg->compression, pCfg->daysPerFile, "STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d",
pCfg->fsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, dbFName, pCfg->buffer, pCfg->cacheLastRow, pCfg->compression, pCfg->daysPerFile, pCfg->fsyncPeriod,
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
1 == pCfg->numOfStables); pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables);
if (retentions) { if (retentions) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions); len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
@ -230,11 +229,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char *dbFName, S
} }
(varDataLen(buf2)) = len; (varDataLen(buf2)) = len;
colDataAppend(pCol2, 0, buf2, false); colDataAppend(pCol2, 0, buf2, false);
} }
static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveTableRsp** pRsp) { static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildCreateDBResultDataBlock(); SSDataBlock* pBlock = buildCreateDBResultDataBlock();
setCreateDBResultIntoDataBlock(pBlock, pStmt->dbName, pStmt->pCfg); setCreateDBResultIntoDataBlock(pBlock, pStmt->dbName, pStmt->pCfg);
@ -254,7 +252,7 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT
(*pRsp)->numOfCols = htonl(SHOW_CREATE_DB_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_CREATE_DB_RESULT_COLS);
int32_t len = 0; int32_t len = 0;
blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_CREATE_DB_RESULT_COLS, false); blockEncode(pBlock, (*pRsp)->data, &len, SHOW_CREATE_DB_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
@ -276,14 +274,14 @@ static SSDataBlock* buildCreateTbResultDataBlock() {
void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) { void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) { for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
SSchema* pSchema = pCfg->pSchemas + i; SSchema* pSchema = pCfg->pSchemas + i;
char type[32]; char type[32];
sprintf(type, "%s", tDataTypes[pSchema->type].name); sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type) { if (TSDB_DATA_TYPE_VARCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) { } else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
} }
} }
@ -291,19 +289,18 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) { void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfTags; ++i) { for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i; SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
char type[32]; char type[32];
sprintf(type, "%s", tDataTypes[pSchema->type].name); sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type) { if (TSDB_DATA_TYPE_VARCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) { } else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
} }
} }
void appendTagNameFields(char* buf, int32_t* len, STableCfg* pCfg) { void appendTagNameFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfTags; ++i) { for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i; SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
@ -311,13 +308,12 @@ void appendTagNameFields(char* buf, int32_t* len, STableCfg* pCfg) {
} }
} }
int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) { int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
SArray *pTagVals = NULL; SArray* pTagVals = NULL;
STag *pTag = (STag*)pCfg->pTags; STag* pTag = (STag*)pCfg->pTags;
if (pCfg->pTags && pTag->flags & TD_TAG_JSON) { if (pCfg->pTags && pTag->flags & TD_TAG_JSON) {
char *pJson = parseTagDatatoJson(pTag); char* pJson = parseTagDatatoJson(pTag);
if (pJson) { if (pJson) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
taosMemoryFree(pJson); taosMemoryFree(pJson);
@ -325,8 +321,8 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = tTagToValArray((const STag *)pCfg->pTags, &pTagVals); int32_t code = tTagToValArray((const STag*)pCfg->pTags, &pTagVals);
if (code) { if (code) {
return code; return code;
} }
@ -339,20 +335,20 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
if (i > 0) { if (i > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", "); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", ");
} }
if (j >= valueNum) { if (j >= valueNum) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "NULL"); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "NULL");
continue; continue;
} }
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, j); STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
if (pSchema->colId > pTagVal->cid) { if (pSchema->colId > pTagVal->cid) {
qError("tag value and column mismatch, schemaId:%d, valId:%d", pSchema->colId, pTagVal->cid); qError("tag value and column mismatch, schemaId:%d, valId:%d", pSchema->colId, pTagVal->cid);
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} else if (pSchema->colId == pTagVal->cid) { } else if (pSchema->colId == pTagVal->cid) {
char type = pTagVal->type; char type = pTagVal->type;
int32_t tlen = 0; int32_t tlen = 0;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
dataConverToStr(buf + VARSTR_HEADER_SIZE + *len, type, pTagVal->pData, pTagVal->nData, &tlen); dataConverToStr(buf + VARSTR_HEADER_SIZE + *len, type, pTagVal->pData, pTagVal->nData, &tlen);
@ -364,7 +360,6 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
} else { } else {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "NULL"); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "NULL");
} }
/* /*
if (type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_BINARY) {
@ -372,7 +367,7 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
if (num) { if (num) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", "); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", ");
} }
memcpy(buf + VARSTR_HEADER_SIZE + *len, pTagVal->pData, pTagVal->nData); memcpy(buf + VARSTR_HEADER_SIZE + *len, pTagVal->pData, pTagVal->nData);
*len += pTagVal->nData; *len += pTagVal->nData;
} }
@ -397,7 +392,7 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) { void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
@ -426,7 +421,7 @@ void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " ROLLUP("); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " ROLLUP(");
for (int32_t i = 0; i < funcNum; ++i) { for (int32_t i = 0; i < funcNum; ++i) {
char* pFunc = taosArrayGet(pCfg->pFuncs, i); char* pFunc = taosArrayGet(pCfg->pFuncs, i);
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s%s", ((i > 0) ? ", " : ""), pFunc); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s%s", ((i > 0) ? ", " : ""), pFunc);
} }
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ")"); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ")");
} }
@ -436,7 +431,7 @@ void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
} }
} }
static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char *tbName, STableCfg* pCfg) { static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName, STableCfg* pCfg) {
int32_t code = 0; int32_t code = 0;
blockDataEnsureCapacity(pBlock, 1); blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1; pBlock->info.rows = 1;
@ -454,7 +449,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char *tbName,
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE STABLE `%s` (", tbName); len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE STABLE `%s` (", tbName);
appendColumnFields(buf2, &len, pCfg); appendColumnFields(buf2, &len, pCfg);
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS ("); len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS (");
appendTagFields(buf2, &len, pCfg); appendTagFields(buf2, &len, pCfg);
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")"); len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
appendTableOptions(buf2, &len, pCfg); appendTableOptions(buf2, &len, pCfg);
} else if (TSDB_CHILD_TABLE == pCfg->tableType) { } else if (TSDB_CHILD_TABLE == pCfg->tableType) {
@ -474,16 +469,15 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char *tbName,
} }
varDataLen(buf2) = len; varDataLen(buf2) = len;
colDataAppend(pCol2, 0, buf2, false); colDataAppend(pCol2, 0, buf2, false);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) { static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildCreateTbResultDataBlock(); SSDataBlock* pBlock = buildCreateTbResultDataBlock();
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->tableName, pStmt->pCfg); int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->tableName, pStmt->pCfg);
if (code) { if (code) {
return code; return code;
} }
@ -503,7 +497,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs
(*pRsp)->numOfCols = htonl(SHOW_CREATE_TB_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_CREATE_TB_RESULT_COLS);
int32_t len = 0; int32_t len = 0;
blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_CREATE_TB_RESULT_COLS, false); blockEncode(pBlock, (*pRsp)->data, &len, SHOW_CREATE_TB_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
@ -516,17 +510,17 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR
terrno = TSDB_CODE_TSC_NOT_STABLE_ERROR; terrno = TSDB_CODE_TSC_NOT_STABLE_ERROR;
return terrno; return terrno;
} }
return execShowCreateTable(pStmt, pRsp); return execShowCreateTable(pStmt, pRsp);
} }
static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) { if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) {
return terrno; return terrno;
} }
if (taosSetCfg(tsCfg, pStmt->config)) { if (taosSetCfg(tsCfg, pStmt->config)) {
return terrno; return terrno;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -551,21 +545,20 @@ static SSDataBlock* buildLocalVariablesResultDataBlock() {
return pBlock; return pBlock;
} }
int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) { int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array); int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0; int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg); blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SConfigItem *pItem = taosArrayGet(tsCfg->array, i); SConfigItem* pItem = taosArrayGet(tsCfg->array, i);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false); colDataAppend(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0; int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen); cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen); varDataSetLen(value, valueLen);
@ -575,16 +568,14 @@ int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
numOfRows++; numOfRows++;
} }
pBlock->info.rows = numOfRows; pBlock->info.rows = numOfRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) { static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildLocalVariablesResultDataBlock(); SSDataBlock* pBlock = buildLocalVariablesResultDataBlock();
int32_t code = setLocalVariablesResultIntoDataBlock(pBlock); int32_t code = setLocalVariablesResultIntoDataBlock(pBlock);
if (code) { if (code) {
return code; return code;
} }
@ -604,7 +595,7 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfCols = htonl(SHOW_LOCAL_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_LOCAL_VARIABLES_RESULT_COLS);
int32_t len = 0; int32_t len = 0;
blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_LOCAL_VARIABLES_RESULT_COLS, false); blockEncode(pBlock, (*pRsp)->data, &len, SHOW_LOCAL_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);

View File

@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tdatablock.h"
#include "commandInt.h" #include "commandInt.h"
#include "plannodes.h" #include "plannodes.h"
#include "query.h" #include "query.h"
#include "tcommon.h" #include "tcommon.h"
#include "tdatablock.h"
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes); int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level); int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
@ -216,7 +216,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
SExplainResNode *pResNode = NULL; SExplainResNode *pResNode = NULL;
FOREACH(node, pPhysiChildren) { FOREACH(node, pPhysiChildren) {
QRY_ERR_RET(qExplainGenerateResNode((SPhysiNode *)node, group, &pResNode)); QRY_ERR_RET(qExplainGenerateResNode((SPhysiNode *)node, group, &pResNode));
QRY_ERR_RET(nodesListAppend(*pChildren, (SNode*)pResNode)); QRY_ERR_RET(nodesListAppend(*pChildren, (SNode *)pResNode));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -232,14 +232,14 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group
SExplainRsp *rsp = NULL; SExplainRsp *rsp = NULL;
for (int32_t i = 0; i < group->nodeNum; ++i) { for (int32_t i = 0; i < group->nodeNum; ++i) {
rsp = taosArrayGet(group->nodeExecInfo, i); rsp = taosArrayGet(group->nodeExecInfo, i);
/* /*
if (group->physiPlanExecIdx >= rsp->numOfPlans) { if (group->physiPlanExecIdx >= rsp->numOfPlans) {
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
*/ */
taosArrayPush(*pExecInfo, rsp->subplanInfo); taosArrayPush(*pExecInfo, rsp->subplanInfo);
} }
@ -426,23 +426,23 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (EXPLAIN_MODE_ANALYZE == ctx->mode) { if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
EXPLAIN_ROW_NEW(level + 1, "I/O: "); EXPLAIN_ROW_NEW(level + 1, "I/O: ");
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo); int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
struct STableScanAnalyzeInfo info = {0}; struct STableScanAnalyzeInfo info = {0};
int32_t maxIndex = 0; int32_t maxIndex = 0;
int32_t totalRows = 0; int32_t totalRows = 0;
for(int32_t i = 0; i < nodeNum; ++i) { for (int32_t i = 0; i < nodeNum; ++i) {
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i); SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i);
STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo; STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
info.totalBlocks += pScanInfo->totalBlocks; info.totalBlocks += pScanInfo->totalBlocks;
info.loadBlocks += pScanInfo->loadBlocks; info.loadBlocks += pScanInfo->loadBlocks;
info.totalRows += pScanInfo->totalRows; info.totalRows += pScanInfo->totalRows;
info.skipBlocks += pScanInfo->skipBlocks; info.skipBlocks += pScanInfo->skipBlocks;
info.filterTime += pScanInfo->filterTime; info.filterTime += pScanInfo->filterTime;
info.loadBlockStatis += pScanInfo->loadBlockStatis; info.loadBlockStatis += pScanInfo->loadBlockStatis;
info.totalCheckedRows += pScanInfo->totalCheckedRows; info.totalCheckedRows += pScanInfo->totalCheckedRows;
info.filterOutBlocks += pScanInfo->filterOutBlocks; info.filterOutBlocks += pScanInfo->filterOutBlocks;
if (pScanInfo->totalRows > totalRows) { if (pScanInfo->totalRows > totalRows) {
totalRows = pScanInfo->totalRows; totalRows = pScanInfo->totalRows;
@ -468,13 +468,14 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
//Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end, start offset by 1.470 ms. // Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end,
// start offset by 1.470 ms.
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, maxIndex); SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, maxIndex);
STableScanAnalyzeInfo *p1 = (STableScanAnalyzeInfo *)execInfo->verboseInfo; STableScanAnalyzeInfo *p1 = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
EXPLAIN_ROW_NEW(level + 1, " "); EXPLAIN_ROW_NEW(level + 1, " ");
EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows, "tbd", EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows,
execInfo->startupCost, execInfo->totalCost); "tbd", execInfo->startupCost, execInfo->totalCost);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
@ -752,7 +753,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, "Sort Key: "); EXPLAIN_ROW_NEW(level + 1, "Sort Key: ");
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pSortNode->pSortKeys); ++i) { for (int32_t i = 0; i < LIST_LENGTH(pSortNode->pSortKeys); ++i) {
SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pSortNode->pSortKeys, i); SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pSortNode->pSortKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
} }
} }
@ -907,16 +908,16 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pFillNode->pValues) { if (pFillNode->pValues) {
SNodeListNode *pValues = (SNodeListNode*)pFillNode->pValues; SNodeListNode *pValues = (SNodeListNode *)pFillNode->pValues;
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_VALUE_FORMAT); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_VALUE_FORMAT);
SNode* tNode = NULL; SNode *tNode = NULL;
int32_t i = 0; int32_t i = 0;
FOREACH(tNode, pValues->pNodeList) { FOREACH(tNode, pValues->pNodeList) {
if (i) { if (i) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
} }
SValueNode* tValue = (SValueNode*)tNode; SValueNode *tValue = (SValueNode *)tNode;
char *value = nodesGetStrValueFromNode(tValue); char *value = nodesGetStrValueFromNode(tValue);
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, value); EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, value);
taosMemoryFree(value); taosMemoryFree(value);
++i; ++i;
@ -926,8 +927,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
} }
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pFillNode->timeRange.skey, EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pFillNode->timeRange.skey, pFillNode->timeRange.ekey);
pFillNode->timeRange.ekey);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
@ -1070,13 +1070,13 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (EXPLAIN_MODE_ANALYZE == ctx->mode) { if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
// sort key // sort key
EXPLAIN_ROW_NEW(level + 1, "Merge Key: "); EXPLAIN_ROW_NEW(level + 1, "Merge Key: ");
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pMergeNode->pMergeKeys, i); SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
} }
} }
@ -1115,7 +1115,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT);
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pMergeNode->pMergeKeys, i); SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
} }
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
@ -1130,7 +1130,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
} }
break; break;
} }
default: default:
qError("not supported physical node type %d", pNode->type); qError("not supported physical node type %d", pNode->type);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
@ -1190,12 +1190,12 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
SSDataBlock *pBlock = createDataBlock(); SSDataBlock *pBlock = createDataBlock();
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_EXPLAIN_RESULT_ROW_SIZE, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_EXPLAIN_RESULT_ROW_SIZE, 1);
blockDataAppendColInfo(pBlock, &infoData); blockDataAppendColInfo(pBlock, &infoData);
blockDataEnsureCapacity(pBlock, rowNum); blockDataEnsureCapacity(pBlock, rowNum);
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData *pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
char buf[1024] = {0}; char buf[1024] = {0};
for (int32_t i = 0; i < rowNum; ++i) { for (int32_t i = 0; i < rowNum; ++i) {
@ -1219,7 +1219,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->numOfRows = htonl(rowNum); rsp->numOfRows = htonl(rowNum);
int32_t len = 0; int32_t len = 0;
blockCompressEncode(pBlock, rsp->data, &len, taosArrayGetSize(pBlock->pDataBlock), 0); blockEncode(pBlock, rsp->data, &len, taosArrayGetSize(pBlock->pDataBlock), 0);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
rsp->compLen = htonl(len); rsp->compLen = htonl(len);

View File

@ -25,37 +25,36 @@
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \ do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \ assert(sizeof(_uid) == sizeof(uint64_t)); \
*(uint64_t *)(_k) = (_uid); \ *(uint64_t*)(_k) = (_uid); \
memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
} while (0) } while (0)
#define SET_RES_EXT_WINDOW_KEY(_k, _ori, _len, _uid, _buf) \ #define SET_RES_EXT_WINDOW_KEY(_k, _ori, _len, _uid, _buf) \
do { \ do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \ assert(sizeof(_uid) == sizeof(uint64_t)); \
*(void **)(_k) = (_buf); \ *(void**)(_k) = (_buf); \
*(uint64_t *)((_k) + POINTER_BYTES) = (_uid); \ *(uint64_t*)((_k) + POINTER_BYTES) = (_uid); \
memcpy((_k) + POINTER_BYTES + sizeof(uint64_t), (_ori), (_len)); \ memcpy((_k) + POINTER_BYTES + sizeof(uint64_t), (_ori), (_len)); \
} while (0) } while (0)
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES) #define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
typedef struct SGroupResInfo { typedef struct SGroupResInfo {
int32_t index; int32_t index;
SArray* pRows; // SArray<SResKeyPos> SArray* pRows; // SArray<SResKeyPos>
} SGroupResInfo; } SGroupResInfo;
typedef struct SResultRow { typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t offset:29; // row index in buffer page int32_t offset : 29; // row index in buffer page
bool startInterp; // the time window start timestamp has done the interpolation already. bool startInterp; // the time window start timestamp has done the interpolation already.
bool endInterp; // the time window end timestamp has done the interpolation already. bool endInterp; // the time window end timestamp has done the interpolation already.
bool closed; // this result status: closed or opened bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window uint32_t numOfRows; // number of rows of current time window
STimeWindow win; STimeWindow win;
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
} SResultRow; } SResultRow;
@ -66,57 +65,58 @@ typedef struct SResultRowPosition {
typedef struct SResKeyPos { typedef struct SResKeyPos {
SResultRowPosition pos; SResultRowPosition pos;
uint64_t groupId; uint64_t groupId;
char key[]; char key[];
} SResKeyPos; } SResKeyPos;
typedef struct SResultRowInfo { typedef struct SResultRowInfo {
int32_t size; // number of result set int32_t size; // number of result set
SResultRowPosition cur; SResultRowPosition cur;
SList* openWindow; SList* openWindow;
} SResultRowInfo; } SResultRowInfo;
struct SqlFunctionCtx; struct SqlFunctionCtx;
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRowInfo(SResultRowInfo* pResultRowInfo);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo);
void initResultRow(SResultRow *pResultRow); void initResultRow(SResultRow* pResultRow);
void closeResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow);
bool isResultRowClosed(SResultRow* pResultRow); bool isResultRowClosed(SResultRow* pResultRow);
struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId); SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
return pRow; return pRow;
} }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo); bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo); int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
SArray* createSortInfo(SNodeList* pNodeList); SArray* createSortInfo(SNodeList* pNodeList);
SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn); void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode);

View File

@ -69,14 +69,16 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
// data format: // data format:
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+ // +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema ...| column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... // |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema ...| column#1 length, column#2
// | | (4 bytes) |(8 bytes) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | actual size | | // length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
// |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
// actual size | |
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+ // +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is // The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header // recorded in the first segment, next to the struct header
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
int32_t numOfCols = 0; int32_t numOfCols = 0;
SNode* pNode; SNode* pNode;
FOREACH(pNode, pHandle->pSchema->pSlots) { FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) { if (pSlotDesc->output) {
@ -90,12 +92,12 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->dataLen = 0; pEntry->dataLen = 0;
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed); blockEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
pBuf->useSize += pEntry->dataLen; pBuf->useSize += pEntry->dataLen;
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
} }
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
@ -187,8 +189,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->numOfCols = pEntry->numOfCols; pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed; pOutput->compressed = pEntry->compressed;
atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen); atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDispatcher); pOutput->bufStatus = updateStatus(pDispatcher);
@ -198,7 +200,6 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->precision = pDispatcher->pSchema->precision; pOutput->precision = pDispatcher->pSchema->precision;
taosThreadMutexUnlock(&pDispatcher->mutex); taosThreadMutexUnlock(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -189,9 +189,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
if (!pDescNode->output) { // todo disable it temporarily /*if (!pDescNode->output) { // todo disable it temporarily*/
continue; /*continue;*/
} /*}*/
SColumnInfoData idata = SColumnInfoData idata =
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId); createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);

View File

@ -2048,7 +2048,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes); blockDataCleanup(pRes);
// blockDataEnsureCapacity(pRes, numOfRows); // blockDataEnsureCapacity(pRes, numOfRows);
blockCompressDecode(pRes, numOfOutput, numOfRows, pData); blockDecode(pRes, numOfOutput, numOfRows, pData);
} else { // extract data according to pColList } else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList)); ASSERT(numOfOutput == taosArrayGetSize(pColList));
char* pStart = pData; char* pStart = pData;
@ -2072,7 +2072,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
blockDataAppendColInfo(pBlock, &idata); blockDataAppendColInfo(pBlock, &idata);
} }
blockCompressDecode(pBlock, numOfCols, numOfRows, pStart); blockDecode(pBlock, numOfCols, numOfRows, pStart);
blockDataEnsureCapacity(pRes, numOfRows); blockDataEnsureCapacity(pRes, numOfRows);
// data from mnode // data from mnode
@ -2934,7 +2934,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
*length = 0; *length = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
*result = (char*)taosMemoryCalloc(1, totalSize); *result = (char*)taosMemoryCalloc(1, totalSize);
if (*result == NULL) { if (*result == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -3898,60 +3898,59 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum){ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
taosArrayClear(pTableListInfo->pGroupList); taosArrayClear(pTableListInfo->pGroupList);
SArray *sortSupport = taosArrayInit(groupNum, sizeof(uint64_t)); SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
if(sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t)); uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ); int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
if (index == -1){ if (index == -1) {
void *p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT); void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
SArray *tGroup = taosArrayInit(8, sizeof(STableKeyInfo)); SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
if(tGroup == NULL) { if (tGroup == NULL) {
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if(taosArrayPush(tGroup, info) == NULL){ if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push info array error"); qError("taos push info array error");
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if(p == NULL){ if (p == NULL) {
if(taosArrayPush(sortSupport, groupId) != NULL){ if (taosArrayPush(sortSupport, groupId) != NULL) {
qError("taos push support array error"); qError("taos push support array error");
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if(taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL){ if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL) {
qError("taos push group array error"); qError("taos push group array error");
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
}else{ } else {
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p); int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
if(taosArrayInsert(sortSupport, pos, groupId) == NULL){ if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
qError("taos insert support array error"); qError("taos insert support array error");
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if(taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL){ if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
qError("taos insert group array error"); qError("taos insert group array error");
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} }
}else{ } else {
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index); SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
if(taosArrayPush(tGroup, info) == NULL){ if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push uid array error"); qError("taos push uid array error");
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} }
} }
taosArrayDestroy(sortSupport); taosArrayDestroy(sortSupport);
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
@ -3969,9 +3968,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
int32_t keyLen = 0; int32_t keyLen = 0;
void* keyBuf = NULL; void* keyBuf = NULL;
SNode* node; SNode* node;
FOREACH(node, group) { FOREACH(node, group) {
SExprNode *pExpr = (SExprNode *)node; SExprNode* pExpr = (SExprNode*)node;
keyLen += pExpr->resType.bytes; keyLen += pExpr->resType.bytes;
} }
@ -3990,15 +3989,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, info->uid); metaGetTableEntryByUid(&mr, info->uid);
SNodeList *groupNew = nodesCloneList(group); SNodeList* groupNew = nodesCloneList(group);
nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr); nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
char* isNull = (char*)keyBuf; char* isNull = (char*)keyBuf;
char* pStart = (char*)keyBuf + nullFlagSize; char* pStart = (char*)keyBuf + nullFlagSize;
SNode* pNode; SNode* pNode;
int32_t index = 0; int32_t index = 0;
FOREACH(pNode, groupNew){ FOREACH(pNode, groupNew) {
SNode* pNew = NULL; SNode* pNew = NULL;
int32_t code = scalarCalculateConstants(pNode, &pNew); int32_t code = scalarCalculateConstants(pNode, &pNew);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -4010,15 +4009,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} }
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE); ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
SValueNode *pValue = (SValueNode *)pNew; SValueNode* pValue = (SValueNode*)pNew;
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL) { if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL) {
isNull[index++] = 1; isNull[index++] = 1;
continue; continue;
} else { } else {
isNull[index++] = 0; isNull[index++] = 0;
char* data = nodesGetValueFromNode(pValue); char* data = nodesGetValueFromNode(pValue);
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){ if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
int32_t len = getJsonValueLen(data); int32_t len = getJsonValueLen(data);
memcpy(pStart, data, len); memcpy(pStart, data, len);
pStart += len; pStart += len;
@ -4031,7 +4030,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} }
} }
} }
int32_t len = (int32_t)(pStart - (char*)keyBuf); int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t groupId = calcGroupId(keyBuf, len); uint64_t groupId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
info->groupId = groupId; info->groupId = groupId;
@ -4042,7 +4041,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} }
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);
if(pTableListInfo->needSortTableByGroupId){ if (pTableListInfo->needSortTableByGroupId) {
return sortTableGroup(pTableListInfo, groupNum); return sortTableGroup(pTableListInfo, groupNum);
} }
@ -4050,7 +4049,8 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} }
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, const char* pUser) { uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
const char* pUser) {
int32_t type = nodeType(pPhyNode); int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
@ -4058,7 +4058,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
if(code){ if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
@ -4076,7 +4076,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
if(code){ if (code) {
return NULL; return NULL;
} }
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
@ -4085,7 +4085,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId); SOperatorInfo* pOperator =
createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
@ -4105,7 +4106,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
} }
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId); SOperatorInfo* pOperator =
createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
@ -4603,8 +4605,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;
(*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond; (*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
(*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond; (*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
(*pTaskInfo)->pRoot = (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->user); &(*pTaskInfo)->tableqinfoList, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code; code = (*pTaskInfo)->code;
@ -4622,8 +4624,8 @@ _complete:
static void doDestroyTableList(STableListInfo* pTableqinfoList) { static void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList); taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map); taosHashCleanup(pTableqinfoList->map);
if(pTableqinfoList->needSortTableByGroupId){ if (pTableqinfoList->needSortTableByGroupId) {
for(int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++){ for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
taosArrayDestroy(tmp); taosArrayDestroy(tmp);
} }

View File

@ -30,7 +30,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
/*int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/ /*int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i); SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
@ -50,7 +50,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
taosArraySetSize(pArray, 1); taosArraySetSize(pArray, 1);
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);

View File

@ -108,7 +108,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
int32_t actualLen = 0; int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false); blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
SStreamRetrieveReq req = { SStreamRetrieveReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
@ -181,7 +181,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = 0; int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false); blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->dataLen, &actualLen);