diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h
index 48afd1802c..c5c634a9c5 100644
--- a/src/client/inc/tscUtil.h
+++ b/src/client/inc/tscUtil.h
@@ -55,7 +55,7 @@ SDataBlockList* tscCreateBlockArrayList();
void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
-void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pDataList);
+void tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId);
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name);
@@ -140,7 +140,7 @@ void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscDoQuery(SSqlObj* pSql);
-int32_t sortRemoveDuplicates(STableDataBlocks* dataBuf, int32_t numOfRows);
+void sortRemoveDuplicates(STableDataBlocks* dataBuf);
#ifdef __cplusplus
}
#endif
diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c
index 44a254f1bd..7dec7280f8 100644
--- a/src/client/src/tscParseInsert.c
+++ b/src/client/src/tscParseInsert.c
@@ -518,18 +518,20 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) {
const int factor = 5;
// expand the allocated size
- while (remain < rowSize * factor) {
- pDataBlock->nAllocSize = (uint32_t) (pDataBlock->nAllocSize * 1.5);
- remain = pDataBlock->nAllocSize - pDataBlock->size;
- }
+ if (remain < rowSize * factor) {
+ while (remain < rowSize * factor) {
+ pDataBlock->nAllocSize = (uint32_t) (pDataBlock->nAllocSize * 1.5);
+ remain = pDataBlock->nAllocSize - pDataBlock->size;
+ }
- char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
- if (tmp != NULL) {
- pDataBlock->pData = tmp;
- memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
- } else {
- assert(false);
- // do nothing
+ char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
+ if (tmp != NULL) {
+ pDataBlock->pData = tmp;
+ memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
+ } else {
+ assert(false);
+ // do nothing
+ }
}
return (int32_t)(pDataBlock->nAllocSize - pDataBlock->size) / rowSize;
@@ -542,16 +544,21 @@ static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const SMeterMeta *pMeterM
pBlocks->numOfRows += numOfRows;
}
-int32_t sortRemoveDuplicates(STableDataBlocks *dataBuf, int32_t numOfRows) {
- // data block is disordered, sort it in ascending order
+// data block is disordered, sort it in ascending order
+void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
+ SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)dataBuf->pData;
+
+ // size is less than the total size, since duplicated rows may be removed yet.
+ assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);
+
if (!dataBuf->ordered) {
- char *pBlockData = dataBuf->pData + sizeof(SShellSubmitBlock);
- qsort(pBlockData, numOfRows, dataBuf->rowSize, rowDataCompar);
+ char *pBlockData = pBlocks->payLoad;
+ qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
int32_t i = 0;
int32_t j = 1;
- while (j < numOfRows) {
+ while (j < pBlocks->numOfRows) {
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
@@ -568,11 +575,11 @@ int32_t sortRemoveDuplicates(STableDataBlocks *dataBuf, int32_t numOfRows) {
++j;
}
- numOfRows = i + 1;
dataBuf->ordered = true;
- }
- return numOfRows;
+ pBlocks->numOfRows = i + 1;
+ dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize*pBlocks->numOfRows;
+ }
}
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd,
@@ -998,7 +1005,7 @@ int tsParseInsertStatement(SSqlObj *pSql, char *str, char *acct, char *db) {
// submit to more than one vnode
if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgid
- tscMergeTableDataBlocks(pCmd, pCmd->pDataBlocks);
+ tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks);
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
@@ -1033,8 +1040,6 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
int code = TSDB_CODE_INVALID_SQL;
SSqlCmd *pCmd = &pSql->cmd;
- tscCleanSqlCmd(pCmd);
-
sql = tscGetToken(sql, &verb, &verblen);
if (verblen) {
@@ -1055,6 +1060,7 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
int32_t ret = TSDB_CODE_SUCCESS;
+ tscCleanSqlCmd(&pSql->cmd);
if (tscIsInsertOrImportData(pSql->sqlstr)) {
/*
@@ -1074,6 +1080,9 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
} else {
SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr);
+
+ tscAllocPayloadWithSize(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
+
ret = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);
}
@@ -1098,7 +1107,7 @@ static int doPackSendDataBlock(SSqlObj* pSql, int32_t numOfRows, STableDataBlock
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);
- tscMergeTableDataBlocks(pCmd, pCmd->pDataBlocks);
+ tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks);
// the pDataBlock is different from the pTableDataBlocks
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c
index 33f699660d..cf43aad94b 100644
--- a/src/client/src/tscSQLParser.c
+++ b/src/client/src/tscSQLParser.c
@@ -143,9 +143,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_INVALID_SQL;
}
- tscCleanSqlCmd(pCmd);
- tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
-
// transfer pInfo into select operation
switch (pInfo->sqlType) {
case DROP_TABLE:
@@ -785,7 +782,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// set sliding value
SSQLToken* pSliding = &pQuerySql->sliding;
if (pSliding->n != 0) {
- if (!tscEmbedded) {
+ // pCmd->count == 1 means sql in stream function
+ if (!tscEmbedded && pCmd->count == 0) {
const char* msg = "not support sliding in query";
setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL;
diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c
index f661fd853c..9f58fe1390 100644
--- a/src/client/src/tscServer.c
+++ b/src/client/src/tscServer.c
@@ -287,14 +287,20 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
- if (UTIL_METER_IS_METRIC(pCmd) &&
- (pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID || pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION)) {
+ if (UTIL_METER_IS_METRIC(pCmd) && pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION) {
/*
* for metric query, in case of any meter missing during query, sub-query of metric query will failed,
* causing metric query failed, and return TSDB_CODE_METRICMETA_EXPIRED code to app
*/
tscTrace("%p invalid meters id cause metric query failed, code:%d", pSql, pMsg->content[0]);
code = TSDB_CODE_METRICMETA_EXPIRED;
+ } else if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
+ pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
+ /*
+ * session id is invalid(e.g., less than 0 or larger than maximum session per
+ * vnode) in submit/query msg, no retry
+ */
+ code = TSDB_CODE_INVALID_QUERY_MSG;
} else if (pCmd->command == TSDB_SQL_CONNECT) {
code = TSDB_CODE_NETWORK_UNAVAIL;
} else if (pCmd->command == TSDB_SQL_HB) {
@@ -1027,8 +1033,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.count); /* number of meters to be inserted */
- pMsg += sizeof(SShellSubmitMsg);
-
/*
* pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
*/
diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c
index a58238624f..d55a765d3b 100644
--- a/src/client/src/tscSql.c
+++ b/src/client/src/tscSql.c
@@ -72,7 +72,7 @@ TAOS *taos_connect_imp(char *ip, char *user, char *pass, char *db, int port, voi
pObj->signature = pObj;
strncpy(pObj->user, user, TSDB_USER_LEN);
- taosEncryptPass(pass, strlen(pass), pObj->pass);
+ taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass);
pObj->mgmtPort = port ? port : tsMgmtShellPort;
if (db) {
diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c
index 84741f098e..ea3d1e86e5 100644
--- a/src/client/src/tscStream.c
+++ b/src/client/src/tscStream.c
@@ -145,13 +145,14 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql, int32_t numOfRows) {
SSqlRes *pRes = &pSql->res;
- int64_t timestamp = *(int64_t *)pRes->data;
- if (timestamp != pStream->stime) {
+ int64_t timestamp = *(int64_t *)pRes->data;
+ int64_t actualTimestamp = pStream->stime - pStream->interval;
+
+ if (timestamp != actualTimestamp) {
// reset the timestamp of each agg point by using start time of each interval
- *((int64_t *)pRes->data) = pStream->stime - pStream->interval;
- tscWarn("%p stream:%p, timestamp of points is:%lld, reset to %lld", pSql, pStream, timestamp,
- pStream->stime - pStream->interval);
+ *((int64_t *)pRes->data) = actualTimestamp;
+ tscWarn("%p stream:%p, timestamp of points is:%lld, reset to %lld", pSql, pStream, timestamp, actualTimestamp);
}
}
@@ -397,7 +398,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
} else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
- tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime, stime);
+ tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime);
} else {
int64_t newStime = (stime / pStream->interval) * pStream->interval;
if (newStime != stime) {
@@ -435,13 +436,25 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
+static void setErrorInfo(STscObj* pObj, int32_t code, char* info) {
+ if (pObj == NULL) {
+ return;
+ }
+
+ SSqlCmd* pCmd = &pObj->pSql->cmd;
+
+ pObj->pSql->res.code = code;
+ strncpy(pCmd->payload, info, pCmd->payloadLen);
+}
+
TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL;
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
- if (pSql == NULL) { // todo set corect error msg
+ if (pSql == NULL) {
+ setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
return NULL;
}
@@ -451,22 +464,31 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param,
SSqlRes *pRes = &pSql->res;
tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
- pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
- if (pSql->sqlstr == NULL) { // todo set corect error msg
+ pSql->sqlstr = strdup(sqlstr);
+ if (pSql->sqlstr == NULL) {
+ setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
+
tfree(pSql);
return NULL;
}
- strcpy(pSql->sqlstr, sqlstr);
sem_init(&pSql->rspSem, 0, 0);
sem_init(&pSql->emptyRspSem, 0, 1);
SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr);
+
+ tscCleanSqlCmd(&pSql->cmd);
+ tscAllocPayloadWithSize(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
+
+ //todo refactor later
+ pSql->cmd.count = 1;
pRes->code = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);
if (pRes->code != TSDB_CODE_SUCCESS) {
+ setErrorInfo(pObj, pRes->code, pCmd->payload);
+
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
@@ -474,6 +496,8 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param,
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
+ setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
+
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c
index e71fa0792b..4c9c180f6a 100644
--- a/src/client/src/tscUtil.c
+++ b/src/client/src/tscUtil.c
@@ -401,7 +401,7 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
}
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) {
- STableDataBlocks *dataBuf = tscCreateDataBlock(size);
+ STableDataBlocks* dataBuf = tscCreateDataBlock(size);
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
@@ -419,7 +419,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData
}
if (dataBuf == NULL) {
- dataBuf = tscCreateDataBlockEx((size_t) size, rowSize, startOffset, tableId);
+ dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf);
tscAppendDataBlock(pDataBlockList, dataBuf);
}
@@ -427,7 +427,8 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData
return dataBuf;
}
-void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pTableDataBlockList) {
+void tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
+ SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosInitIntHash(8, sizeof(void*), taosHashInt);
SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList();
@@ -453,9 +454,10 @@ void tscMergeTableDataBlocks(SSqlCmd* pCmd, SDataBlockList* pTableDataBlockList)
}
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData;
- assert(pBlocks->numOfRows * pOneTableBlock->rowSize + sizeof(SShellSubmitBlock) == pOneTableBlock->size);
+ sortRemoveDuplicates(pOneTableBlock);
- pBlocks->numOfRows = (int16_t)sortRemoveDuplicates(pOneTableBlock, pBlocks->numOfRows);
+ tscTrace("%p meterId:%s, sid:%d, rows:%d, sversion:%d", pSql, pOneTableBlock->meterId, pBlocks->sid,
+ pBlocks->numOfRows, pBlocks->sversion);
pBlocks->sid = htonl(pBlocks->sid);
pBlocks->uid = htobe64(pBlocks->uid);
@@ -883,7 +885,7 @@ static int32_t validateQuoteToken(SSQLToken* pToken) {
if (pToken->type == TK_STRING) {
return tscValidateName(pToken);
- }
+ }
if (k != pToken->n || pToken->type != TK_ID) {
return TSDB_CODE_INVALID_SQL;
@@ -906,16 +908,16 @@ int32_t tscValidateName(SSQLToken* pToken) {
int len = tSQLGetToken(pToken->z, &pToken->type);
// single token, validate it
- if (len == pToken->n){
+ if (len == pToken->n) {
return validateQuoteToken(pToken);
} else {
- sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n);
- if (sep == NULL) {
- return TSDB_CODE_INVALID_SQL;
- }
+ sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n);
+ if (sep == NULL) {
+ return TSDB_CODE_INVALID_SQL;
+ }
return tscValidateName(pToken);
- }
+ }
} else {
if (isNumber(pToken)) {
return TSDB_CODE_INVALID_SQL;
@@ -927,7 +929,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
if (pToken->type == TK_SPACE) {
strtrim(pToken->z);
- pToken->n = (uint32_t)strlen(pToken->z);
+ pToken->n = (uint32_t)strlen(pToken->z);
}
pToken->n = tSQLGetToken(pToken->z, &pToken->type);
diff --git a/src/system/src/vnodeMeter.c b/src/system/src/vnodeMeter.c
index d6283cfa4f..b8961e98ec 100644
--- a/src/system/src/vnodeMeter.c
+++ b/src/system/src/vnodeMeter.c
@@ -512,6 +512,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
char * pData;
TSKEY tsKey;
+ int cfile;
int points = 0;
int code = TSDB_CODE_SUCCESS;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
@@ -528,6 +529,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
// to guarantee time stamp is the same for all vnodes
pData = pSubmit->payLoad;
tsKey = taosGetTimestamp(pVnode->cfg.precision);
+ cfile = tsKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
if (*((TSKEY *)pData) == 0) {
for (i = 0; i < numOfPoints; ++i) {
*((TSKEY *)pData) = tsKey++;
@@ -570,9 +572,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
code = 0;
TSKEY firstKey = *((TSKEY *)pData);
- if (pVnode->lastKeyOnFile > pVnode->cfg.daysToKeep * tsMsPerDay[pVnode->cfg.precision] + firstKey) {
- dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is too old to insert, key:%lld", pObj->vnode, pObj->sid,
- pObj->meterId, pVnode->lastKeyOnFile, firstKey);
+ int firstId = firstKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
+ int lastId = (*(TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
+ if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) {
+ dError("vid:%d sid:%d id:%s, invalid timestamp to insert, firstKey: %ld lastKey: %ld ", pObj->vnode, pObj->sid,
+ pObj->meterId, firstKey, (*(TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1))));
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
}
@@ -582,7 +586,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
- code = TSDB_CODE_INVALID_SESSION_ID;
+ code = TSDB_CODE_NOT_ACTIVE_SESSION;
break;
}
diff --git a/src/system/src/vnodeShell.c b/src/system/src/vnodeShell.c
index de3cca4fdf..03ddf522ce 100644
--- a/src/system/src/vnodeShell.c
+++ b/src/system/src/vnodeShell.c
@@ -248,7 +248,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
}
if (pQueryMsg->numOfSids <= 0) {
- code = TSDB_CODE_APP_ERROR;
+ code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
@@ -263,7 +263,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0) {
dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode);
vnodeSendVpeerCfgMsg(pQueryMsg->vnode);
- code = TSDB_CODE_INVALID_SESSION_ID;
+ code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _query_over;
}
@@ -274,13 +274,13 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pQueryMsg->pSidExtInfo == 0) {
dTrace("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
- code = TSDB_CODE_APP_ERROR;
+ code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
- code = TSDB_CODE_INVALID_SESSION_ID;
+ code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _query_over;
}
@@ -448,7 +448,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pSubmit->numOfSid <= 0) {
dError("invalid num of meters:%d", pSubmit->numOfSid);
- code = TSDB_CODE_APP_ERROR;
+ code = TSDB_CODE_INVALID_QUERY_MSG;
goto _submit_over;
}
@@ -462,7 +462,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) {
dError("vid:%d is not activated for submit", pSubmit->vnode);
vnodeSendVpeerCfgMsg(pSubmit->vnode);
- code = TSDB_CODE_INVALID_SESSION_ID;
+ code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _submit_over;
}
diff --git a/src/system/src/vnodeStream.c b/src/system/src/vnodeStream.c
index 75bb972e74..d7b7f62bae 100644
--- a/src/system/src/vnodeStream.c
+++ b/src/system/src/vnodeStream.c
@@ -13,8 +13,9 @@
* along with this program. If not, see .
*/
+#include "taosmsg.h"
#include "vnode.h"
-#include
+#include "vnodeUtil.h"
/* static TAOS *dbConn = NULL; */
void vnodeCloseStreamCallback(void *param);