diff --git a/examples/c/tmq.c b/examples/c/tmq.c
index 53b259b0af..68b0492ace 100644
--- a/examples/c/tmq.c
+++ b/examples/c/tmq.c
@@ -138,7 +138,7 @@ int32_t create_topic() {
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
- pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3,t1 from st1 where t1 = 2000");
+ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1 where t1 = 2000");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h
index 6653fdd9cd..72e0af657c 100644
--- a/include/common/tdatablock.h
+++ b/include/common/tdatablock.h
@@ -224,7 +224,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
-int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
+int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
@@ -232,9 +232,8 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(SSDataBlock* pBlock, int32_t index);
-void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
- int8_t needCompress);
-const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
+void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
+const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index c5b0b89311..0afcaaa50d 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -623,6 +623,7 @@ typedef struct {
col_id_t colId;
int16_t slotId;
};
+ bool output; // TODO remove it later
int16_t type;
int32_t bytes;
diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c
index 45a525d124..94bd07a18a 100644
--- a/source/client/src/clientMsgHandler.c
+++ b/source/client/src/clientMsgHandler.c
@@ -67,11 +67,11 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
dstEpSet.eps[dstEpSet.inUse].fqdn);
} else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
- SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
- SEp* pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
- tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp",
- pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port,
- connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn, pNewEp->port);
+ SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
+ SEp* pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
+ tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
+ pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
+ pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
}
@@ -308,13 +308,13 @@ static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
- SVariablesInfo *pInfo = taosArrayGet(pVars, i);
+ SVariablesInfo* pInfo = taosArrayGet(pVars, i);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
- SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
+ SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false);
-
+
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
@@ -324,14 +324,13 @@ static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
pBlock->info.rows = numOfCfg;
*block = pBlock;
-
+
return TSDB_CODE_SUCCESS;
}
-
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = NULL;
- int32_t code = buildShowVariablesBlock(pVars, &pBlock);
+ int32_t code = buildShowVariablesBlock(pVars, &pBlock);
if (code) {
return code;
}
@@ -351,7 +350,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = 0;
- blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false);
+ blockEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock);
@@ -363,7 +362,7 @@ int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code)
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
} else {
- SShowVariablesRsp rsp = {0};
+ SShowVariablesRsp rsp = {0};
SRetrieveTableRsp* pRes = NULL;
code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
if (TSDB_CODE_SUCCESS == code) {
@@ -372,7 +371,7 @@ int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code)
if (TSDB_CODE_SUCCESS == code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false);
}
-
+
tFreeSShowVariablesRsp(&rsp);
}
@@ -384,7 +383,6 @@ int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code)
return code;
}
-
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
switch (msgType) {
case TDMT_MND_CONNECT:
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index 1ec298ee15..dd9500e8d8 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -1613,7 +1613,8 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
- printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId);
+ printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type,
+ pDataBlock->info.childId, pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
for (int32_t k = 0; k < numOfCols; k++) {
@@ -1902,8 +1903,7 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
return rname.childTableName;
}
-void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
- int8_t needCompress) {
+void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) {
// todo extract method
int32_t* actualLen = (int32_t*)data;
data += sizeof(int32_t);
@@ -1913,6 +1913,7 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
+
*((int16_t*)data) = pColInfoData->info.type;
data += sizeof(int16_t);
@@ -1960,7 +1961,7 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
*groupId = pBlock->info.groupId;
}
-const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
+const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
const char* pStart = pData;
int32_t dataLen = *(int32_t*)pStart;
diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
index 980749ca70..234a133243 100644
--- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
@@ -17,7 +17,6 @@
#include "dmInt.h"
#include "systable.h"
-
extern SConfig *tsCfg;
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
@@ -83,7 +82,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(*pMgmt->getVnodeLoadsFp)(&vinfo);
req.pVloads = vinfo.pVloads;
- SMonMloadInfo minfo = {0};
+ SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo);
req.mload = minfo.load;
@@ -186,10 +185,10 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
-SSDataBlock* dmBuildVariablesBlock(void) {
- SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
+SSDataBlock *dmBuildVariablesBlock(void) {
+ SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0;
- const SSysTableMeta* pMeta = NULL;
+ const SSysTableMeta *pMeta = NULL;
getInfosDbMeta(&pMeta, &size);
int32_t index = 0;
@@ -215,7 +214,7 @@ SSDataBlock* dmBuildVariablesBlock(void) {
return pBlock;
}
-int32_t dmAppendVariablesToBlock(SSDataBlock* pBlock, int32_t dnodeId) {
+int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg);
@@ -230,8 +229,8 @@ int32_t dmAppendVariablesToBlock(SSDataBlock* pBlock, int32_t dnodeId) {
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false);
-
- char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
+
+ char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
@@ -241,9 +240,8 @@ int32_t dmAppendVariablesToBlock(SSDataBlock* pBlock, int32_t dnodeId) {
numOfRows++;
}
-
pBlock->info.rows = numOfRows;
-
+
return TSDB_CODE_SUCCESS;
}
@@ -267,7 +265,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
- SSDataBlock* pBlock = dmBuildVariablesBlock();
+ SSDataBlock *pBlock = dmBuildVariablesBlock();
dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
@@ -283,14 +281,14 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
- char *pStart = pRsp->data;
+ char *pStart = pRsp->data;
*(int32_t *)pStart = htonl(numOfCols);
pStart += sizeof(int32_t); // number of columns
for (int32_t i = 0; i < numOfCols; ++i) {
SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
-
+
pSchema->bytes = htonl(pColInfo->info.bytes);
pSchema->colId = htons(pColInfo->info.colId);
pSchema->type = pColInfo->info.type;
@@ -299,7 +297,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
int32_t len = 0;
- blockCompressEncode(pBlock, pStart, &len, numOfCols, false);
+ blockEncode(pBlock, pStart, &len, numOfCols, false);
pRsp->numOfRows = htonl(pBlock->info.rows);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c
index 164bcc7d60..6014adbe95 100644
--- a/source/dnode/mnode/impl/src/mndShow.c
+++ b/source/dnode/mnode/impl/src/mndShow.c
@@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE
#include "mndShow.h"
-#include "systable.h"
#include "mndPrivilege.h"
+#include "systable.h"
#define SHOW_STEP_SIZE 100
@@ -307,7 +307,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
}
int32_t len = 0;
- blockCompressEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false);
+ blockEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false);
}
pRsp->numOfRows = htonl(rowsRead);
diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c
index afeeeab500..c7d7c787a2 100644
--- a/source/dnode/vnode/src/tq/tqExec.c
+++ b/source/dnode/vnode/src/tq/tqExec.c
@@ -29,7 +29,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRs
// TODO enable compress
int32_t actualLen = 0;
- blockCompressEncode(pBlock, pRetrieve->data, &actualLen, taosArrayGetSize(pBlock->pDataBlock), false);
+ blockEncode(pBlock, pRetrieve->data, &actualLen, taosArrayGetSize(pBlock->pDataBlock), false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pRsp->blockDataLen, &actualLen);
diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c
index cb14b916af..d6b6456515 100644
--- a/source/libs/command/src/command.c
+++ b/source/libs/command/src/command.c
@@ -18,8 +18,8 @@
#include "tdatablock.h"
#include "tglobal.h"
-extern SConfig *tsCfg;
-static int32_t getSchemaBytes(const SSchema* pSchema) {
+extern SConfig* tsCfg;
+static int32_t getSchemaBytes(const SSchema* pSchema) {
switch (pSchema->type) {
case TSDB_DATA_TYPE_BINARY:
return (pSchema->bytes - VARSTR_HEADER_SIZE);
@@ -104,7 +104,7 @@ static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfCols = htonl(DESCRIBE_RESULT_COLS);
int32_t len = 0;
- blockCompressEncode(pBlock, (*pRsp)->data, &len, DESCRIBE_RESULT_COLS, false);
+ blockEncode(pBlock, (*pRsp)->data, &len, DESCRIBE_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock);
@@ -113,9 +113,8 @@ static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) {
static int32_t execResetQueryCache() { return catalogClearCache(); }
-
static SSDataBlock* buildCreateDBResultDataBlock() {
- SSDataBlock* pBlock = createDataBlock();
+ SSDataBlock* pBlock = createDataBlock();
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_COLS, 1);
blockDataAppendColInfo(pBlock, &infoData);
@@ -149,14 +148,14 @@ int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
return v;
}
-char *buildRetension(SArray *pRetension) {
+char* buildRetension(SArray* pRetension) {
size_t size = taosArrayGetSize(pRetension);
if (size == 0) {
return NULL;
}
- char *p1 = taosMemoryCalloc(1, 100);
- SRetention *p = taosArrayGet(pRetension, 0);
+ char* p1 = taosMemoryCalloc(1, 100);
+ SRetention* p = taosArrayGet(pRetension, 0);
int32_t len = 0;
@@ -185,8 +184,7 @@ char *buildRetension(SArray *pRetension) {
return p1;
}
-
-static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char *dbFName, SDbCfgInfo* pCfg) {
+static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, SDbCfgInfo* pCfg) {
blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1;
@@ -198,7 +196,7 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char *dbFName, S
SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1);
char buf2[SHOW_CREATE_DB_RESULT_FIELD2_LEN] = {0};
int32_t len = 0;
- char *prec = NULL;
+ char* prec = NULL;
switch (pCfg->precision) {
case TSDB_TIME_PRECISION_MILLI:
prec = TSDB_TIME_PRECISION_MILLI_STR;
@@ -214,15 +212,16 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char *dbFName, S
break;
}
- char *retentions = buildRetension(pCfg->pRetensions);
-
- len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s` BUFFER %d CACHELAST %d COMP %d DURATION %dm "
- "FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
- "STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d",
- dbFName, pCfg->buffer, pCfg->cacheLastRow, pCfg->compression, pCfg->daysPerFile,
- pCfg->fsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
- pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups,
- 1 == pCfg->numOfStables);
+ char* retentions = buildRetension(pCfg->pRetensions);
+
+ len += sprintf(buf2 + VARSTR_HEADER_SIZE,
+ "CREATE DATABASE `%s` BUFFER %d CACHELAST %d COMP %d DURATION %dm "
+ "FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
+ "STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d",
+ dbFName, pCfg->buffer, pCfg->cacheLastRow, pCfg->compression, pCfg->daysPerFile, pCfg->fsyncPeriod,
+ pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
+ pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups,
+ 1 == pCfg->numOfStables);
if (retentions) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
@@ -230,11 +229,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char *dbFName, S
}
(varDataLen(buf2)) = len;
-
+
colDataAppend(pCol2, 0, buf2, false);
}
-
static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildCreateDBResultDataBlock();
setCreateDBResultIntoDataBlock(pBlock, pStmt->dbName, pStmt->pCfg);
@@ -254,7 +252,7 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT
(*pRsp)->numOfCols = htonl(SHOW_CREATE_DB_RESULT_COLS);
int32_t len = 0;
- blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_CREATE_DB_RESULT_COLS, false);
+ blockEncode(pBlock, (*pRsp)->data, &len, SHOW_CREATE_DB_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock);
@@ -276,14 +274,14 @@ static SSDataBlock* buildCreateTbResultDataBlock() {
void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
SSchema* pSchema = pCfg->pSchemas + i;
- char type[32];
+ char type[32];
sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
- sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE));
+ sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
-
+
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
}
}
@@ -291,19 +289,18 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
- char type[32];
+ char type[32];
sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
- sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE));
+ sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
}
}
-
void appendTagNameFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
@@ -311,13 +308,12 @@ void appendTagNameFields(char* buf, int32_t* len, STableCfg* pCfg) {
}
}
-
int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
- SArray *pTagVals = NULL;
- STag *pTag = (STag*)pCfg->pTags;
-
+ SArray* pTagVals = NULL;
+ STag* pTag = (STag*)pCfg->pTags;
+
if (pCfg->pTags && pTag->flags & TD_TAG_JSON) {
- char *pJson = parseTagDatatoJson(pTag);
+ char* pJson = parseTagDatatoJson(pTag);
if (pJson) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
taosMemoryFree(pJson);
@@ -325,8 +321,8 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
return TSDB_CODE_SUCCESS;
}
-
- int32_t code = tTagToValArray((const STag *)pCfg->pTags, &pTagVals);
+
+ int32_t code = tTagToValArray((const STag*)pCfg->pTags, &pTagVals);
if (code) {
return code;
}
@@ -339,20 +335,20 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
if (i > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", ");
}
-
+
if (j >= valueNum) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "NULL");
continue;
}
-
- STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, j);
+
+ STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
if (pSchema->colId > pTagVal->cid) {
qError("tag value and column mismatch, schemaId:%d, valId:%d", pSchema->colId, pTagVal->cid);
taosArrayDestroy(pTagVals);
return TSDB_CODE_APP_ERROR;
} else if (pSchema->colId == pTagVal->cid) {
- char type = pTagVal->type;
- int32_t tlen = 0;
+ char type = pTagVal->type;
+ int32_t tlen = 0;
if (IS_VAR_DATA_TYPE(type)) {
dataConverToStr(buf + VARSTR_HEADER_SIZE + *len, type, pTagVal->pData, pTagVal->nData, &tlen);
@@ -364,7 +360,6 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
} else {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "NULL");
}
-
/*
if (type == TSDB_DATA_TYPE_BINARY) {
@@ -372,7 +367,7 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
if (num) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", ");
}
-
+
memcpy(buf + VARSTR_HEADER_SIZE + *len, pTagVal->pData, pTagVal->nData);
*len += pTagVal->nData;
}
@@ -397,7 +392,7 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
taosArrayDestroy(pTagVals);
- return TSDB_CODE_SUCCESS;
+ return TSDB_CODE_SUCCESS;
}
void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
@@ -426,7 +421,7 @@ void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " ROLLUP(");
for (int32_t i = 0; i < funcNum; ++i) {
char* pFunc = taosArrayGet(pCfg->pFuncs, i);
- *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s%s", ((i > 0) ? ", " : ""), pFunc);
+ *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s%s", ((i > 0) ? ", " : ""), pFunc);
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ")");
}
@@ -436,7 +431,7 @@ void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
}
}
-static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char *tbName, STableCfg* pCfg) {
+static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName, STableCfg* pCfg) {
int32_t code = 0;
blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1;
@@ -454,7 +449,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char *tbName,
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE STABLE `%s` (", tbName);
appendColumnFields(buf2, &len, pCfg);
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS (");
- appendTagFields(buf2, &len, pCfg);
+ appendTagFields(buf2, &len, pCfg);
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
appendTableOptions(buf2, &len, pCfg);
} else if (TSDB_CHILD_TABLE == pCfg->tableType) {
@@ -474,16 +469,15 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char *tbName,
}
varDataLen(buf2) = len;
-
+
colDataAppend(pCol2, 0, buf2, false);
return TSDB_CODE_SUCCESS;
}
-
static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildCreateTbResultDataBlock();
- int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->tableName, pStmt->pCfg);
+ int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->tableName, pStmt->pCfg);
if (code) {
return code;
}
@@ -503,7 +497,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs
(*pRsp)->numOfCols = htonl(SHOW_CREATE_TB_RESULT_COLS);
int32_t len = 0;
- blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_CREATE_TB_RESULT_COLS, false);
+ blockEncode(pBlock, (*pRsp)->data, &len, SHOW_CREATE_TB_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock);
@@ -516,17 +510,17 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR
terrno = TSDB_CODE_TSC_NOT_STABLE_ERROR;
return terrno;
}
-
+
return execShowCreateTable(pStmt, pRsp);
}
-static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
+static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) {
- return terrno;
+ return terrno;
}
if (taosSetCfg(tsCfg, pStmt->config)) {
- return terrno;
+ return terrno;
}
return TSDB_CODE_SUCCESS;
@@ -551,21 +545,20 @@ static SSDataBlock* buildLocalVariablesResultDataBlock() {
return pBlock;
}
-
int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
- SConfigItem *pItem = taosArrayGet(tsCfg->array, i);
+ SConfigItem* pItem = taosArrayGet(tsCfg->array, i);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
- SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
+ SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false);
-
- char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
+
+ char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
@@ -575,16 +568,14 @@ int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
numOfRows++;
}
-
pBlock->info.rows = numOfRows;
-
+
return TSDB_CODE_SUCCESS;
}
-
static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildLocalVariablesResultDataBlock();
- int32_t code = setLocalVariablesResultIntoDataBlock(pBlock);
+ int32_t code = setLocalVariablesResultIntoDataBlock(pBlock);
if (code) {
return code;
}
@@ -604,7 +595,7 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfCols = htonl(SHOW_LOCAL_VARIABLES_RESULT_COLS);
int32_t len = 0;
- blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_LOCAL_VARIABLES_RESULT_COLS, false);
+ blockEncode(pBlock, (*pRsp)->data, &len, SHOW_LOCAL_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock);
diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c
index ef4ac0b639..7af36a0842 100644
--- a/source/libs/command/src/explain.c
+++ b/source/libs/command/src/explain.c
@@ -13,11 +13,11 @@
* along with this program. If not, see .
*/
-#include "tdatablock.h"
#include "commandInt.h"
#include "plannodes.h"
#include "query.h"
#include "tcommon.h"
+#include "tdatablock.h"
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
@@ -216,7 +216,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
SExplainResNode *pResNode = NULL;
FOREACH(node, pPhysiChildren) {
QRY_ERR_RET(qExplainGenerateResNode((SPhysiNode *)node, group, &pResNode));
- QRY_ERR_RET(nodesListAppend(*pChildren, (SNode*)pResNode));
+ QRY_ERR_RET(nodesListAppend(*pChildren, (SNode *)pResNode));
}
return TSDB_CODE_SUCCESS;
@@ -232,14 +232,14 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group
SExplainRsp *rsp = NULL;
for (int32_t i = 0; i < group->nodeNum; ++i) {
rsp = taosArrayGet(group->nodeExecInfo, i);
-/*
- if (group->physiPlanExecIdx >= rsp->numOfPlans) {
- qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
- return TSDB_CODE_QRY_APP_ERROR;
- }
+ /*
+ if (group->physiPlanExecIdx >= rsp->numOfPlans) {
+ qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
- taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
-*/
+ taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
+ */
taosArrayPush(*pExecInfo, rsp->subplanInfo);
}
@@ -426,23 +426,23 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
EXPLAIN_ROW_NEW(level + 1, "I/O: ");
- int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
+ int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
struct STableScanAnalyzeInfo info = {0};
int32_t maxIndex = 0;
int32_t totalRows = 0;
- for(int32_t i = 0; i < nodeNum; ++i) {
+ for (int32_t i = 0; i < nodeNum; ++i) {
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i);
STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
- info.totalBlocks += pScanInfo->totalBlocks;
- info.loadBlocks += pScanInfo->loadBlocks;
- info.totalRows += pScanInfo->totalRows;
- info.skipBlocks += pScanInfo->skipBlocks;
- info.filterTime += pScanInfo->filterTime;
- info.loadBlockStatis += pScanInfo->loadBlockStatis;
+ info.totalBlocks += pScanInfo->totalBlocks;
+ info.loadBlocks += pScanInfo->loadBlocks;
+ info.totalRows += pScanInfo->totalRows;
+ info.skipBlocks += pScanInfo->skipBlocks;
+ info.filterTime += pScanInfo->filterTime;
+ info.loadBlockStatis += pScanInfo->loadBlockStatis;
info.totalCheckedRows += pScanInfo->totalCheckedRows;
- info.filterOutBlocks += pScanInfo->filterOutBlocks;
+ info.filterOutBlocks += pScanInfo->filterOutBlocks;
if (pScanInfo->totalRows > totalRows) {
totalRows = pScanInfo->totalRows;
@@ -468,13 +468,14 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
- //Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end, start offset by 1.470 ms.
+ // Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end,
+ // start offset by 1.470 ms.
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, maxIndex);
STableScanAnalyzeInfo *p1 = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
EXPLAIN_ROW_NEW(level + 1, " ");
- EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows, "tbd",
- execInfo->startupCost, execInfo->totalCost);
+ EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows,
+ "tbd", execInfo->startupCost, execInfo->totalCost);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_END();
@@ -752,7 +753,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, "Sort Key: ");
if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pSortNode->pSortKeys); ++i) {
- SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pSortNode->pSortKeys, i);
+ SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pSortNode->pSortKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
}
}
@@ -907,16 +908,16 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pFillNode->pValues) {
- SNodeListNode *pValues = (SNodeListNode*)pFillNode->pValues;
+ SNodeListNode *pValues = (SNodeListNode *)pFillNode->pValues;
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_VALUE_FORMAT);
- SNode* tNode = NULL;
+ SNode *tNode = NULL;
int32_t i = 0;
FOREACH(tNode, pValues->pNodeList) {
if (i) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
- SValueNode* tValue = (SValueNode*)tNode;
- char *value = nodesGetStrValueFromNode(tValue);
+ SValueNode *tValue = (SValueNode *)tNode;
+ char *value = nodesGetStrValueFromNode(tValue);
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, value);
taosMemoryFree(value);
++i;
@@ -926,8 +927,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
- EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pFillNode->timeRange.skey,
- pFillNode->timeRange.ekey);
+ EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pFillNode->timeRange.skey, pFillNode->timeRange.ekey);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
@@ -1070,13 +1070,13 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
-
+
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
// sort key
EXPLAIN_ROW_NEW(level + 1, "Merge Key: ");
if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
- SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pMergeNode->pMergeKeys, i);
+ SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
}
}
@@ -1115,7 +1115,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT);
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
- SOrderByExprNode *ptn = (SOrderByExprNode*)nodesListGetNode(pMergeNode->pMergeKeys, i);
+ SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
}
EXPLAIN_ROW_END();
@@ -1130,7 +1130,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
}
break;
- }
+ }
default:
qError("not supported physical node type %d", pNode->type);
return TSDB_CODE_QRY_APP_ERROR;
@@ -1190,12 +1190,12 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
- SSDataBlock *pBlock = createDataBlock();
+ SSDataBlock *pBlock = createDataBlock();
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_EXPLAIN_RESULT_ROW_SIZE, 1);
blockDataAppendColInfo(pBlock, &infoData);
blockDataEnsureCapacity(pBlock, rowNum);
- SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
+ SColumnInfoData *pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
char buf[1024] = {0};
for (int32_t i = 0; i < rowNum; ++i) {
@@ -1219,7 +1219,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->numOfRows = htonl(rowNum);
int32_t len = 0;
- blockCompressEncode(pBlock, rsp->data, &len, taosArrayGetSize(pBlock->pDataBlock), 0);
+ blockEncode(pBlock, rsp->data, &len, taosArrayGetSize(pBlock->pDataBlock), 0);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
rsp->compLen = htonl(len);
diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h
index f3e1eb47e8..df961c00fa 100644
--- a/source/libs/executor/inc/executil.h
+++ b/source/libs/executor/inc/executil.h
@@ -25,37 +25,36 @@
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \
- *(uint64_t *)(_k) = (_uid); \
+ *(uint64_t*)(_k) = (_uid); \
memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
} while (0)
-#define SET_RES_EXT_WINDOW_KEY(_k, _ori, _len, _uid, _buf) \
- do { \
- assert(sizeof(_uid) == sizeof(uint64_t)); \
- *(void **)(_k) = (_buf); \
- *(uint64_t *)((_k) + POINTER_BYTES) = (_uid); \
- memcpy((_k) + POINTER_BYTES + sizeof(uint64_t), (_ori), (_len)); \
+#define SET_RES_EXT_WINDOW_KEY(_k, _ori, _len, _uid, _buf) \
+ do { \
+ assert(sizeof(_uid) == sizeof(uint64_t)); \
+ *(void**)(_k) = (_buf); \
+ *(uint64_t*)((_k) + POINTER_BYTES) = (_uid); \
+ memcpy((_k) + POINTER_BYTES + sizeof(uint64_t), (_ori), (_len)); \
} while (0)
-
-#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
+#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
-#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
+#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
typedef struct SGroupResInfo {
int32_t index;
- SArray* pRows; // SArray
+ SArray* pRows; // SArray
} SGroupResInfo;
typedef struct SResultRow {
- int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
- int32_t offset:29; // row index in buffer page
- bool startInterp; // the time window start timestamp has done the interpolation already.
- bool endInterp; // the time window end timestamp has done the interpolation already.
- bool closed; // this result status: closed or opened
- uint32_t numOfRows; // number of rows of current time window
- STimeWindow win;
+ int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
+ int32_t offset : 29; // row index in buffer page
+ bool startInterp; // the time window start timestamp has done the interpolation already.
+ bool endInterp; // the time window end timestamp has done the interpolation already.
+ bool closed; // this result status: closed or opened
+ uint32_t numOfRows; // number of rows of current time window
+ STimeWindow win;
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
} SResultRow;
@@ -66,57 +65,58 @@ typedef struct SResultRowPosition {
typedef struct SResKeyPos {
SResultRowPosition pos;
- uint64_t groupId;
- char key[];
+ uint64_t groupId;
+ char key[];
} SResKeyPos;
typedef struct SResultRowInfo {
- int32_t size; // number of result set
+ int32_t size; // number of result set
SResultRowPosition cur;
- SList* openWindow;
+ SList* openWindow;
} SResultRowInfo;
struct SqlFunctionCtx;
-size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
-void initResultRowInfo(SResultRowInfo* pResultRowInfo);
-void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
+size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
+void initResultRowInfo(SResultRowInfo* pResultRowInfo);
+void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
-void closeAllResultRows(SResultRowInfo* pResultRowInfo);
+void closeAllResultRows(SResultRowInfo* pResultRowInfo);
-void initResultRow(SResultRow *pResultRow);
-void closeResultRow(SResultRow* pResultRow);
-bool isResultRowClosed(SResultRow* pResultRow);
+void initResultRow(SResultRow* pResultRow);
+void closeResultRow(SResultRow* pResultRow);
+bool isResultRowClosed(SResultRow* pResultRow);
struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
-static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
- SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId);
+static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
+ SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
return pRow;
}
-void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
-void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
+void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
+void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
-void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
-bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo);
+void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
+bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
-int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
-SArray* createSortInfo(SNodeList* pNodeList);
-SArray* extractPartitionColInfo(SNodeList* pNodeList);
-SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);
+int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
+SArray* createSortInfo(SNodeList* pNodeList);
+SArray* extractPartitionColInfo(SNodeList* pNodeList);
+SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
+ int32_t type);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset);
-void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn);
-void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
+void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn);
+void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c
index 802f9ea5b5..5ee222efb7 100644
--- a/source/libs/executor/src/dataDispatcher.c
+++ b/source/libs/executor/src/dataDispatcher.c
@@ -69,14 +69,16 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
// data format:
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
-// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema ...| column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
-// | | (4 bytes) |(8 bytes) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | actual size | |
+// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema ...| column#1 length, column#2
+// length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
+// |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
+// actual size | |
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
int32_t numOfCols = 0;
- SNode* pNode;
+ SNode* pNode;
FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
@@ -90,12 +92,12 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->dataLen = 0;
pBuf->useSize = sizeof(SDataCacheEntry);
- blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
+ blockEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
pBuf->useSize += pEntry->dataLen;
-
- atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
- atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
+
+ atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
+ atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
}
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
@@ -187,8 +189,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
- atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
- atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
+ atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
+ atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDispatcher);
@@ -198,7 +200,6 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->precision = pDispatcher->pSchema->precision;
taosThreadMutexUnlock(&pDispatcher->mutex);
-
return TSDB_CODE_SUCCESS;
}
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 5c2f32126b..2561516f6a 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -189,9 +189,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
- if (!pDescNode->output) { // todo disable it temporarily
- continue;
- }
+ /*if (!pDescNode->output) { // todo disable it temporarily*/
+ /*continue;*/
+ /*}*/
SColumnInfoData idata =
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index ad0d5c1447..706fc87520 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -2048,7 +2048,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes);
// blockDataEnsureCapacity(pRes, numOfRows);
- blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
+ blockDecode(pRes, numOfOutput, numOfRows, pData);
} else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList));
char* pStart = pData;
@@ -2072,7 +2072,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
blockDataAppendColInfo(pBlock, &idata);
}
- blockCompressDecode(pBlock, numOfCols, numOfRows, pStart);
+ blockDecode(pBlock, numOfCols, numOfRows, pStart);
blockDataEnsureCapacity(pRes, numOfRows);
// data from mnode
@@ -2934,7 +2934,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
*length = 0;
return TSDB_CODE_SUCCESS;
}
-
+
*result = (char*)taosMemoryCalloc(1, totalSize);
if (*result == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@@ -3898,60 +3898,59 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
return TSDB_CODE_SUCCESS;
}
-static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum){
+static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
taosArrayClear(pTableListInfo->pGroupList);
- SArray *sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
- if(sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
+ SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
+ if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
- uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
+ uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
- if (index == -1){
- void *p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
- SArray *tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
- if(tGroup == NULL) {
+ if (index == -1) {
+ void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
+ SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
+ if (tGroup == NULL) {
taosArrayDestroy(sortSupport);
return TSDB_CODE_OUT_OF_MEMORY;
}
- if(taosArrayPush(tGroup, info) == NULL){
+ if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push info array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
- if(p == NULL){
- if(taosArrayPush(sortSupport, groupId) != NULL){
+ if (p == NULL) {
+ if (taosArrayPush(sortSupport, groupId) != NULL) {
qError("taos push support array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
- if(taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL){
+ if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL) {
qError("taos push group array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
- }else{
+ } else {
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
- if(taosArrayInsert(sortSupport, pos, groupId) == NULL){
+ if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
qError("taos insert support array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
- if(taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL){
+ if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
qError("taos insert group array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
}
- }else{
+ } else {
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
- if(taosArrayPush(tGroup, info) == NULL){
+ if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push uid array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
}
-
}
taosArrayDestroy(sortSupport);
return TDB_CODE_SUCCESS;
@@ -3969,9 +3968,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
int32_t keyLen = 0;
void* keyBuf = NULL;
- SNode* node;
+ SNode* node;
FOREACH(node, group) {
- SExprNode *pExpr = (SExprNode *)node;
+ SExprNode* pExpr = (SExprNode*)node;
keyLen += pExpr->resType.bytes;
}
@@ -3990,15 +3989,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, info->uid);
- SNodeList *groupNew = nodesCloneList(group);
+ SNodeList* groupNew = nodesCloneList(group);
nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
char* isNull = (char*)keyBuf;
char* pStart = (char*)keyBuf + nullFlagSize;
- SNode* pNode;
+ SNode* pNode;
int32_t index = 0;
- FOREACH(pNode, groupNew){
+ FOREACH(pNode, groupNew) {
SNode* pNew = NULL;
int32_t code = scalarCalculateConstants(pNode, &pNew);
if (TSDB_CODE_SUCCESS == code) {
@@ -4010,15 +4009,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
- SValueNode *pValue = (SValueNode *)pNew;
+ SValueNode* pValue = (SValueNode*)pNew;
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL) {
isNull[index++] = 1;
continue;
} else {
isNull[index++] = 0;
- char* data = nodesGetValueFromNode(pValue);
- if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
+ char* data = nodesGetValueFromNode(pValue);
+ if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
int32_t len = getJsonValueLen(data);
memcpy(pStart, data, len);
pStart += len;
@@ -4031,7 +4030,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
}
}
- int32_t len = (int32_t)(pStart - (char*)keyBuf);
+ int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t groupId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
info->groupId = groupId;
@@ -4042,7 +4041,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
taosMemoryFree(keyBuf);
- if(pTableListInfo->needSortTableByGroupId){
+ if (pTableListInfo->needSortTableByGroupId) {
return sortTableGroup(pTableListInfo, groupNum);
}
@@ -4050,7 +4049,8 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
- uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, const char* pUser) {
+ uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
+ const char* pUser) {
int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
@@ -4058,7 +4058,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
- if(code){
+ if (code) {
pTaskInfo->code = code;
return NULL;
}
@@ -4076,7 +4076,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
- if(code){
+ if (code) {
return NULL;
}
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
@@ -4085,7 +4085,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
- SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
+ SOperatorInfo* pOperator =
+ createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
@@ -4105,7 +4106,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
}
- SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
+ SOperatorInfo* pOperator =
+ createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
@@ -4603,8 +4605,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql;
(*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
(*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
- (*pTaskInfo)->pRoot =
- createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->user);
+ (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
+ &(*pTaskInfo)->tableqinfoList, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code;
@@ -4622,8 +4624,8 @@ _complete:
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map);
- if(pTableqinfoList->needSortTableByGroupId){
- for(int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++){
+ if (pTableqinfoList->needSortTableByGroupId) {
+ for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
taosArrayDestroy(tmp);
}
diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c
index 3efe3e6c9c..e9b311db9a 100644
--- a/source/libs/stream/src/streamData.c
+++ b/source/libs/stream/src/streamData.c
@@ -30,7 +30,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
/*int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
- blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
+ blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
@@ -50,7 +50,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
taosArraySetSize(pArray, 1);
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
- blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
+ blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index a5e9b8edd9..8034840fce 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -108,7 +108,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
int32_t actualLen = 0;
- blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
+ blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
SStreamRetrieveReq req = {
.streamId = pTask->streamId,
@@ -181,7 +181,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = 0;
- blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
+ blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen);