Merge pull request #892 from taosdata/feature/liaohj

Feature/liaohj
This commit is contained in:
slguan 2019-12-10 13:35:02 +08:00 committed by GitHub
commit 68072018b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 169 additions and 134 deletions

View File

@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter {
} SJoinSubquerySupporter; } SJoinSubquerySupporter;
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
STableDataBlocks* tscCreateDataBlock(int32_t size); STableDataBlocks* tscCreateDataBlock(size_t initialBufSize, int32_t rowSize, int32_t startOffset, const char* name);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset); uint32_t offset);
@ -78,9 +78,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDa
void tscFreeUnusedDataBlocks(SDataBlockList* pList); void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId); int32_t startOffset, int32_t rowSize, const char* tableId);
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);

View File

@ -231,17 +231,22 @@ typedef struct SParamInfo {
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
char meterId[TSDB_METER_ID_LEN]; char meterId[TSDB_METER_ID_LEN];
int8_t tsSource; int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; bool ordered; // if current rows are ordered or not
int64_t vgid; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfMeters; // number of tables in current submit block
int64_t vgid; int32_t rowSize; // row size for current table
int64_t prevTS;
int32_t numOfMeters;
int32_t rowSize;
uint32_t nAllocSize; uint32_t nAllocSize;
uint32_t size; uint32_t size;
/*
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref
* to avoid it to be removed from cache
*/
SMeterMeta* pMeterMeta;
union { union {
char *filename; char *filename;
char *pData; char *pData;
@ -255,8 +260,8 @@ typedef struct STableDataBlocks {
typedef struct SDataBlockList { typedef struct SDataBlockList {
int32_t idx; int32_t idx;
int32_t nSize; uint32_t nSize;
int32_t nAlloc; uint32_t nAlloc;
char * userParam; /* user assigned parameters for async query */ char * userParam; /* user assigned parameters for async query */
void * udfp; /* user defined function pointer, used in async model */ void * udfp; /* user defined function pointer, used in async model */
STableDataBlocks **pData; STableDataBlocks **pData;
@ -274,7 +279,7 @@ typedef struct {
int8_t isInsertFromFile; // load data from file or not int8_t isInsertFromFile; // load data from file or not
bool import; // import/insert type bool import; // import/insert type
char msgType; uint8_t msgType;
uint16_t type; // query type uint16_t type; // query type
char intervalTimeUnit; char intervalTimeUnit;
int64_t etime, stime; int64_t etime, stime;
@ -378,14 +383,14 @@ typedef struct _sql_obj {
char * sqlstr; char * sqlstr;
char retry; char retry;
char maxRetry; char maxRetry;
char index; uint8_t index;
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
tsem_t rspSem; tsem_t rspSem;
tsem_t emptyRspSem; tsem_t emptyRspSem;
SSqlCmd cmd; SSqlCmd cmd;
SSqlRes res; SSqlRes res;
char numOfSubs; uint8_t numOfSubs;
struct _sql_obj **pSubs; struct _sql_obj **pSubs;
struct _sql_obj * prev, *next; struct _sql_obj * prev, *next;
} SSqlObj; } SSqlObj;

View File

@ -643,13 +643,12 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults
} }
/* /*
* * traverse the result and apply the function to each item to check if the item is qualified or not
*/ */
void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, bool (*fp)(tSkipListNode *, void *), static void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) {
tQueryResultset * pResult) {
assert(pExpr->pLeft->nodeType == TSQL_NODE_COL && pExpr->pRight->nodeType == TSQL_NODE_VALUE); assert(pExpr->pLeft->nodeType == TSQL_NODE_COL && pExpr->pRight->nodeType == TSQL_NODE_VALUE);
// brutal force search // brutal force scan the result list and check for each item in the list
int64_t num = pResult->num; int64_t num = pResult->num;
for (int32_t i = 0, j = 0; i < pResult->num; ++i) { for (int32_t i = 0, j = 0; i < pResult->num; ++i) {
if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) { if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) {

View File

@ -1666,7 +1666,7 @@ static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t ind
if (pInfo->hasResult != DATA_SET_FLAG || pInfo->ts < timestamp[index]) { if (pInfo->hasResult != DATA_SET_FLAG || pInfo->ts < timestamp[index]) {
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
pTrace("assign index:%d, ts:%lld, val:%d, ", index, timestamp[index], *(int32_t *)pData); pTrace("assign index:%d, ts:%" PRId64 ", val:%d, ", index, timestamp[index], *(int32_t *)pData);
#endif #endif
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tscJoinProcess.h"
#include "os.h" #include "os.h"
#include "tscJoinProcess.h"
#include "tcache.h" #include "tcache.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
@ -88,7 +88,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
// for debug purpose // for debug purpose
tscPrint("%lld, tags:%d \t %lld, tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag);
#endif #endif
if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) { if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) {
@ -150,7 +150,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
tsBufDestory(pSupporter1->pTSBuf); tsBufDestory(pSupporter1->pTSBuf);
tsBufDestory(pSupporter2->pTSBuf); tsBufDestory(pSupporter2->pTSBuf);
tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", pSql, tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks intersecting", pSql,
numOfInput1, numOfInput2, output1->numOfTotal); numOfInput1, numOfInput2, output1->numOfTotal);
return output1->numOfTotal; return output1->numOfTotal;
@ -528,8 +528,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
numOfFetch++; numOfFetch++;
} }
} else { } else {
if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && tscProjectionQueryOnTable(pSql)) if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) &&
|| (pRes->numOfRows == 0)) { tscProjectionQueryOnTable(&pSql->cmd)) || (pRes->numOfRows == 0)) {
numOfFetch++; numOfFetch++;
} }
} }
@ -1619,7 +1619,7 @@ void tsBufDisplay(STSBuf* pTSBuf) {
while (tsBufNextPos(pTSBuf)) { while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf); STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%lld-%lld\n", elem.vnode, elem.tag, elem.ts); printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts);
} }
pTSBuf->cur.order = old; pTSBuf->cur.order = old;

View File

@ -64,7 +64,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
} break; } break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
len = sprintf(buf, "%lld", *(int64_t *)pData); len = sprintf(buf, "%" PRId64 "", *(int64_t *)pData);
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
len = MAX_BOOL_TYPE_LENGTH; len = MAX_BOOL_TYPE_LENGTH;
@ -228,7 +228,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
sprintf(target, "%d", *(int32_t *)pTagValue); sprintf(target, "%d", *(int32_t *)pTagValue);
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
sprintf(target, "%lld", *(int64_t *)pTagValue); sprintf(target, "%" PRId64 "", *(int64_t *)pTagValue);
break; break;
case TSDB_DATA_TYPE_BOOL: { case TSDB_DATA_TYPE_BOOL: {
char *val = (*((int8_t *)pTagValue) == 0) ? "false" : "true"; char *val = (*((int8_t *)pTagValue) == 0) ? "false" : "true";

View File

@ -982,7 +982,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
strcpy(fname, full_path.we_wordv[0]); strcpy(fname, full_path.we_wordv[0]);
wordfree(&full_path); wordfree(&full_path);
STableDataBlocks *pDataBlock = tscCreateDataBlockEx(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, STableDataBlocks *pDataBlock = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize,
sizeof(SShellSubmitBlock), pMeterMetaInfo->name); sizeof(SShellSubmitBlock), pMeterMetaInfo->name);
tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
@ -1219,8 +1219,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int32_t rowSize = pMeterMeta->rowSize; int32_t rowSize = pMeterMeta->rowSize;
pCmd->pDataBlocks = tscCreateBlockArrayList(); pCmd->pDataBlocks = tscCreateBlockArrayList();
STableDataBlocks *pTableDataBlock = STableDataBlocks *pTableDataBlock = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize,
tscCreateDataBlockEx(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name); sizeof(SShellSubmitBlock), pMeterMetaInfo->name);
tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);

View File

@ -93,10 +93,10 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
const static int64_t SLOW_QUERY_INTERVAL = 3000000L; const static int64_t SLOW_QUERY_INTERVAL = 3000000L;
if (pSql->res.useconds < SLOW_QUERY_INTERVAL) return; if (pSql->res.useconds < SLOW_QUERY_INTERVAL) return;
tscTrace("%p query time:%lld sql:%s", pSql, pSql->res.useconds, pSql->sqlstr); tscTrace("%p query time:%" PRId64 " sql:%s", pSql, pSql->res.useconds, pSql->sqlstr);
char *sql = malloc(200); char *sql = malloc(200);
int len = snprintf(sql, 200, "insert into %s.slowquery values(now, '%s', %lld, %lld, '", tsMonitorDbName, int len = snprintf(sql, 200, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName,
pSql->pTscObj->user, pSql->stime, pSql->res.useconds); pSql->pTscObj->user, pSql->stime, pSql->res.useconds);
int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr); int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr);
if (sqlLen > TSDB_SHOW_SQL_LEN - 1) { if (sqlLen > TSDB_SHOW_SQL_LEN - 1) {

View File

@ -3351,7 +3351,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
} }
int tscProcessConnectRsp(SSqlObj *pSql) { int tscProcessConnectRsp(SSqlObj *pSql) {
char temp[TSDB_METER_ID_LEN]; char temp[TSDB_METER_ID_LEN*2];
SConnectRsp *pConnect; SConnectRsp *pConnect;
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
@ -3359,8 +3359,11 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pConnect = (SConnectRsp *)pRes->pRsp; pConnect = (SConnectRsp *)pRes->pRsp;
strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response
sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); int32_t len =sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
strcpy(pObj->db, temp);
assert(len <= tListLen(pObj->db));
strncpy(pObj->db, temp, tListLen(pObj->db));
#ifdef CLUSTER #ifdef CLUSTER
SIpList * pIpList; SIpList * pIpList;
char *rsp = pRes->pRsp + sizeof(SConnectRsp); char *rsp = pRes->pRsp + sizeof(SConnectRsp);
@ -3635,7 +3638,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
*/ */
if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) { if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
if (pMeterMetaInfo->pMeterMeta) { if (pMeterMetaInfo->pMeterMeta) {
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql, tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta); pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
} }
tscWaitingForCreateTable(&pSql->cmd); tscWaitingForCreateTable(&pSql->cmd);
@ -3643,7 +3646,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
code = tscDoGetMeterMeta(pSql, meterId, 0); // todo ?? code = tscDoGetMeterMeta(pSql, meterId, 0); // todo ??
} else { } else {
tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql, tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid,
pMeterMetaInfo->pMeterMeta); pMeterMetaInfo->pMeterMeta);
} }

View File

@ -532,7 +532,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
doSetResultRowData(pSql->pSubs[1]); doSetResultRowData(pSql->pSubs[1]);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%lld, second:%lld\n", key1, key2); // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
success = true; success = true;
pRes1->row++; pRes1->row++;
pRes2->row++; pRes2->row++;
@ -903,7 +903,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
len += sprintf(str + len, "%lld ", *((int64_t *)row[i])); len += sprintf(str + len, "%" PRId64 " ", *((int64_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
@ -928,7 +928,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
} break; } break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%lld ", *((int64_t *)row[i])); len += sprintf(str + len, "%" PRId64 " ", *((int64_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:

View File

@ -85,7 +85,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
// failed to get meter/metric meta, retry in 10sec. // failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p,get metermeta failed, retry in %lldms", pStream->pSql, pStream, retryDelayTime); tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime);
return; return;
@ -136,7 +136,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
SSqlStream *pStream = (SSqlStream *)param; SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) { if (tres == NULL || numOfRows < 0) {
int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, query data failed, code:%d, retry in %lldms", pStream->pSql, pStream, numOfRows, tscError("%p stream:%p, query data failed, code:%d, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
retryDelay); retryDelay);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pStream->pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pStream->pSql->cmd, 0);
@ -158,7 +158,7 @@ static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) {
if (timestamp != actualTimestamp) { if (timestamp != actualTimestamp) {
// reset the timestamp of each agg point by using start time of each interval // reset the timestamp of each agg point by using start time of each interval
*((int64_t *)pRes->data) = actualTimestamp; *((int64_t *)pRes->data) = actualTimestamp;
tscWarn("%p stream:%p, timestamp of points is:%lld, reset to %lld", pSql, pStream, timestamp, actualTimestamp); tscWarn("%p stream:%p, timestamp of points is:%" PRId64 ", reset to %" PRId64 "", pSql, pStream, timestamp, actualTimestamp);
} }
} }
@ -169,7 +169,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (pSql == NULL || numOfRows < 0) { if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %lldms", pSql, pStream, numOfRows, retryDelayTime); tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscClearMeterMetaInfo(pMeterMetaInfo, true); tscClearMeterMetaInfo(pMeterMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
@ -235,7 +235,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
/* no resuls in the query range, retry */ /* no resuls in the query range, retry */
// todo set retry dynamic time // todo set retry dynamic time
int32_t retry = tsProjectExecInterval; int32_t retry = tsProjectExecInterval;
tscError("%p stream:%p, retrieve no data, code:%d, retry in %lldms", pSql, pStream, numOfRows, retry); tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry);
tscClearSqlMetaInfoForce(&(pStream->pSql->cmd)); tscClearSqlMetaInfoForce(&(pStream->pSql->cmd));
tscSetRetryTimer(pStream, pStream->pSql, retry); tscSetRetryTimer(pStream, pStream->pSql, retry);
@ -265,7 +265,7 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
/* /*
* current time window will be closed, since it too early to exceed the maxRetentWindow value * current time window will be closed, since it too early to exceed the maxRetentWindow value
*/ */
tscTrace("%p stream:%p, etime:%lld is too old, exceeds the max retention time window:%lld, stop the stream", tscTrace("%p stream:%p, etime:%" PRId64 " is too old, exceeds the max retention time window:%" PRId64 ", stop the stream",
pStream->pSql, pStream, pStream->stime, pStream->etime); pStream->pSql, pStream, pStream->stime, pStream->etime);
// TODO : How to terminate stream here // TODO : How to terminate stream here
taos_close_stream(pStream); taos_close_stream(pStream);
@ -276,10 +276,10 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
return; return;
} }
tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", pStream->pSql, pStream, tscTrace("%p stream:%p, next query start at %" PRId64 ", in %" PRId64 "ms. query range %" PRId64 "-%" PRId64 "", pStream->pSql, pStream,
now + timer, timer, pStream->stime, etime); now + timer, timer, pStream->stime, etime);
} else { } else {
tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", pStream->pSql, pStream, tscTrace("%p stream:%p, next query start at %" PRId64 ", in %" PRId64 "ms. query range %" PRId64 "-%" PRId64 "", pStream->pSql, pStream,
pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1); pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1);
} }
@ -299,7 +299,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
*/ */
timer = pStream->slidingTime; timer = pStream->slidingTime;
if (pStream->stime > pStream->etime) { if (pStream->stime > pStream->etime) {
tscTrace("%p stream:%p, stime:%lld is larger than end time: %lld, stop the stream", pStream->pSql, pStream, tscTrace("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream,
pStream->stime, pStream->etime); pStream->stime, pStream->etime);
// TODO : How to terminate stream here // TODO : How to terminate stream here
taos_close_stream(pStream); taos_close_stream(pStream);
@ -353,7 +353,7 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime = int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime; (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
if (pCmd->nAggTimeInterval < minIntervalTime) { if (pCmd->nAggTimeInterval < minIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%lld", pSql, pStream, tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64 "", pSql, pStream,
pCmd->nAggTimeInterval, minIntervalTime); pCmd->nAggTimeInterval, minIntervalTime);
pCmd->nAggTimeInterval = minIntervalTime; pCmd->nAggTimeInterval = minIntervalTime;
} }
@ -368,14 +368,14 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime; (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
if (pCmd->nSlidingTime < minSlidingTime) { if (pCmd->nSlidingTime < minSlidingTime) {
tscWarn("%p stream:%p, original sliding value:%lld too small, reset to:%lld", pSql, pStream, pCmd->nSlidingTime, tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream, pCmd->nSlidingTime,
minSlidingTime); minSlidingTime);
pCmd->nSlidingTime = minSlidingTime; pCmd->nSlidingTime = minSlidingTime;
} }
if (pCmd->nSlidingTime > pCmd->nAggTimeInterval) { if (pCmd->nSlidingTime > pCmd->nAggTimeInterval) {
tscWarn("%p stream:%p, sliding value:%lld can not be larger than interval range, reset to:%lld", pSql, pStream, tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64 "", pSql, pStream,
pCmd->nSlidingTime, pCmd->nAggTimeInterval); pCmd->nSlidingTime, pCmd->nAggTimeInterval);
pCmd->nSlidingTime = pCmd->nAggTimeInterval; pCmd->nSlidingTime = pCmd->nAggTimeInterval;
@ -401,11 +401,11 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
} else { // timewindow based aggregation stream } else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now if (stime == 0) { // no data in meter till now
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime); tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64 "", pSql, pStream, stime);
} else { } else {
int64_t newStime = (stime / pStream->interval) * pStream->interval; int64_t newStime = (stime / pStream->interval) * pStream->interval;
if (newStime != stime) { if (newStime != stime) {
tscWarn("%p stream:%p, last timestamp:%lld, reset to:%lld", pSql, pStream, stime, newStime); tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64 "", pSql, pStream, stime, newStime);
stime = newStime; stime = newStime;
} }
} }
@ -447,7 +447,10 @@ static void setErrorInfo(STscObj* pObj, int32_t code, char* info) {
SSqlCmd* pCmd = &pObj->pSql->cmd; SSqlCmd* pCmd = &pObj->pSql->cmd;
pObj->pSql->res.code = code; pObj->pSql->res.code = code;
if (info != NULL) {
strncpy(pCmd->payload, info, pCmd->payloadLen); strncpy(pCmd->payload, info, pCmd->payloadLen);
}
} }
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
@ -537,7 +540,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
int64_t starttime = tscGetLaunchTimestamp(pStream); int64_t starttime = tscGetLaunchTimestamp(pStream);
taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer);
tscTrace("%p stream:%p is opened, query on:%s, interval:%lld, sliding:%lld, first launched in:%lld, sql:%s", pSql, tscTrace("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
pStream, pMeterMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, sqlstr); pStream, pMeterMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, sqlstr);
return pStream; return pStream;

View File

@ -56,7 +56,7 @@ TAOS_SUB *taos_subscribe(const char *host, const char *user, const char *pass, c
if (pSub->taos == NULL) { if (pSub->taos == NULL) {
tfree(pSub); tfree(pSub);
} else { } else {
char qstr[128]; char qstr[256] = {0};
sprintf(qstr, "use %s", db); sprintf(qstr, "use %s", db);
int res = taos_query(pSub->taos, qstr); int res = taos_query(pSub->taos, qstr);
if (res != 0) { if (res != 0) {
@ -64,7 +64,7 @@ TAOS_SUB *taos_subscribe(const char *host, const char *user, const char *pass, c
taos_close(pSub->taos); taos_close(pSub->taos);
tfree(pSub); tfree(pSub);
} else { } else {
sprintf(qstr, "select * from %s where _c0 > now+1000d", pSub->name); snprintf(qstr, tListLen(qstr), "select * from %s where _c0 > now+1000d", pSub->name);
if (taos_query(pSub->taos, qstr)) { if (taos_query(pSub->taos, qstr)) {
tscTrace("failed to select, reason:%s", taos_errstr(pSub->taos)); tscTrace("failed to select, reason:%s", taos_errstr(pSub->taos));
taos_close(pSub->taos); taos_close(pSub->taos);
@ -106,7 +106,7 @@ TAOS_ROW taos_consume(TAOS_SUB *tsub) {
pSub->stime = taosGetTimestampMs(); pSub->stime = taosGetTimestampMs();
sprintf(qstr, "select * from %s where _c0 > %lld order by _c0 asc", pSub->name, pSub->lastKey); sprintf(qstr, "select * from %s where _c0 > %" PRId64 " order by _c0 asc", pSub->name, pSub->lastKey);
if (taos_query(pSub->taos, qstr)) { if (taos_query(pSub->taos, qstr)) {
tscTrace("failed to select, reason:%s", taos_errstr(pSub->taos)); tscTrace("failed to select, reason:%s", taos_errstr(pSub->taos));
return NULL; return NULL;

View File

@ -198,7 +198,9 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
switch (option) { switch (option) {
case TSDB_OPTION_CONFIGDIR: case TSDB_OPTION_CONFIGDIR:
cfg = tsGetConfigOption("configDir"); cfg = tsGetConfigOption("configDir");
if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { assert(cfg != NULL);
if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
strncpy(configDir, pStr, TSDB_FILENAME_LEN); strncpy(configDir, pStr, TSDB_FILENAME_LEN);
cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
tscPrint("set config file directory:%s", pStr); tscPrint("set config file directory:%s", pStr);
@ -210,7 +212,9 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
case TSDB_OPTION_SHELL_ACTIVITY_TIMER: case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
cfg = tsGetConfigOption("shellActivityTimer"); cfg = tsGetConfigOption("shellActivityTimer");
if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { assert(cfg != NULL);
if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
tsShellActivityTimer = atoi(pStr); tsShellActivityTimer = atoi(pStr);
if (tsShellActivityTimer < 1) tsShellActivityTimer = 1; if (tsShellActivityTimer < 1) tsShellActivityTimer = 1;
if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600; if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600;
@ -224,13 +228,15 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
case TSDB_OPTION_LOCALE: { // set locale case TSDB_OPTION_LOCALE: { // set locale
cfg = tsGetConfigOption("locale"); cfg = tsGetConfigOption("locale");
assert(cfg != NULL);
size_t len = strlen(pStr); size_t len = strlen(pStr);
if (len == 0 || len > TSDB_LOCALE_LEN) { if (len == 0 || len > TSDB_LOCALE_LEN) {
tscPrint("Invalid locale:%s, use default", pStr); tscPrint("Invalid locale:%s, use default", pStr);
return -1; return -1;
} }
if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
char sep = '.'; char sep = '.';
if (strlen(tsLocale) == 0) { // locale does not set yet if (strlen(tsLocale) == 0) { // locale does not set yet
@ -285,13 +291,15 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
case TSDB_OPTION_CHARSET: { case TSDB_OPTION_CHARSET: {
/* set charset will override the value of charset, assigned during system locale changed */ /* set charset will override the value of charset, assigned during system locale changed */
cfg = tsGetConfigOption("charset"); cfg = tsGetConfigOption("charset");
assert(cfg != NULL);
size_t len = strlen(pStr); size_t len = strlen(pStr);
if (len == 0 || len > TSDB_LOCALE_LEN) { if (len == 0 || len > TSDB_LOCALE_LEN) {
tscPrint("failed to set charset:%s", pStr); tscPrint("failed to set charset:%s", pStr);
return -1; return -1;
} }
if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
if (taosValidateEncodec(pStr)) { if (taosValidateEncodec(pStr)) {
if (strlen(tsCharset) == 0) { if (strlen(tsCharset) == 0) {
tscPrint("charset is set:%s", pStr); tscPrint("charset is set:%s", pStr);
@ -314,7 +322,9 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
case TSDB_OPTION_TIMEZONE: case TSDB_OPTION_TIMEZONE:
cfg = tsGetConfigOption("timezone"); cfg = tsGetConfigOption("timezone");
if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { assert(cfg != NULL);
if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
strcpy(tsTimezone, pStr); strcpy(tsTimezone, pStr);
tsSetTimeZone(); tsSetTimeZone();
cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
@ -327,7 +337,9 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
case TSDB_OPTION_SOCKET_TYPE: case TSDB_OPTION_SOCKET_TYPE:
cfg = tsGetConfigOption("sockettype"); cfg = tsGetConfigOption("sockettype");
if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { assert(cfg != NULL);
if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) { if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) {
tscError("only 'tcp' or 'udp' allowed for configuring the socket type"); tscError("only 'tcp' or 'udp' allowed for configuring the socket type");
return -1; return -1;
@ -340,6 +352,7 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
break; break;
default: default:
// TODO return the correct error code to client in the format for taos_errstr()
tscError("Invalid option %d", option); tscError("Invalid option %d", option);
return -1; return -1;
} }

View File

@ -451,15 +451,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql); free(pSql);
} }
STableDataBlocks* tscCreateDataBlock(int32_t size) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
dataBuf->nAllocSize = (uint32_t)size;
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
return dataBuf;
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
@ -467,6 +458,9 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
tfree(pDataBlock->pData); tfree(pDataBlock->pData);
tfree(pDataBlock->params); tfree(pDataBlock->params);
// free the refcount for metermeta
taosRemoveDataFromCache(tscCacheHandle, (void**) &(pDataBlock->pMeterMeta), false);
tfree(pDataBlock); tfree(pDataBlock);
} }
@ -513,11 +507,11 @@ SDataBlockList* tscCreateBlockArrayList() {
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) { void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) {
if (pList->nSize >= pList->nAlloc) { if (pList->nSize >= pList->nAlloc) {
pList->nAlloc = pList->nAlloc << 1; pList->nAlloc = (pList->nAlloc) << 1U;
pList->pData = realloc(pList->pData, sizeof(void*) * (size_t)pList->nAlloc); pList->pData = realloc(pList->pData, POINTER_BYTES * (size_t)pList->nAlloc);
// reset allocated memory // reset allocated memory
memset(pList->pData + pList->nSize, 0, sizeof(void*) * (pList->nAlloc - pList->nSize)); memset(pList->pData + pList->nSize, 0, POINTER_BYTES * (pList->nAlloc - pList->nSize));
} }
pList->pData[pList->nSize++] = pBlocks; pList->pData[pList->nSize++] = pBlocks;
@ -539,11 +533,22 @@ void* tscDestroyBlockArrayList(SDataBlockList* pList) {
} }
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
assert(pDataBlock->pMeterMeta != NULL);
pCmd->count = pDataBlock->numOfMeters; pCmd->count = pDataBlock->numOfMeters;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
//set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
strcpy(pMeterMetaInfo->name, pDataBlock->meterId); strcpy(pMeterMetaInfo->name, pDataBlock->meterId);
taosRemoveDataFromCache(tscCacheHandle, (void**) &(pMeterMetaInfo->pMeterMeta), false);
pMeterMetaInfo->pMeterMeta = pDataBlock->pMeterMeta;
pDataBlock->pMeterMeta = NULL; // delegate the ownership of metermeta to pMeterMetaInfo
} else {
assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0);
}
/* /*
* the submit message consists of : [RPC header|message body|digest] * the submit message consists of : [RPC header|message body|digest]
@ -551,7 +556,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* additional space. * additional space.
*/ */
int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + sizeof(STaosDigest)); int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + sizeof(STaosDigest));
if (TSDB_CODE_SUCCESS != ret) return ret; if (TSDB_CODE_SUCCESS != ret) {
return ret;
}
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize); memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize);
/* /*
@ -561,7 +569,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + sizeof(STaosDigest)); assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + sizeof(STaosDigest));
return tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); return TSDB_CODE_SUCCESS;
} }
void tscFreeUnusedDataBlocks(SDataBlockList* pList) { void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
@ -573,19 +581,38 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
} }
} }
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) { /**
STableDataBlocks* dataBuf = tscCreateDataBlock(size); * create the in-memory buffer for each table to keep the submitted data block
* @param initialSize
* @param rowSize
* @param startOffset
* @param name
* @param pMeterMeta the ownership of pMeterMeta should be transfer to STableDataBlocks
* @return
*/
STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
dataBuf->nAllocSize = (uint32_t) initialSize;
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize; dataBuf->rowSize = rowSize;
dataBuf->size = startOffset; dataBuf->size = startOffset;
dataBuf->tsSource = -1; dataBuf->tsSource = -1;
strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN); strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN);
// sure that the metermeta must be in the local client cache
dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId);
assert(dataBuf->pMeterMeta != NULL && initialSize > 0);
return dataBuf; return dataBuf;
} }
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId) { int32_t startOffset, int32_t rowSize, const char* tableId) {
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id); STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id);
@ -594,7 +621,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData
} }
if (dataBuf == NULL) { if (dataBuf == NULL) {
dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId); dataBuf = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf);
tscAppendDataBlock(pDataBlockList, dataBuf); tscAppendDataBlock(pDataBlockList, dataBuf);
} }
@ -1138,7 +1165,8 @@ void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* sr
*dst = *src; *dst = *src;
if (dst->filterOnBinary) { if (dst->filterOnBinary) {
size_t len = (size_t) dst->len + 1; size_t len = (size_t) dst->len + 1;
dst->pz = calloc(1, len); char* pTmp = calloc(1, len);
dst->pz = (int64_t) pTmp;
memcpy((char*) dst->pz, (char*) src->pz, (size_t) len); memcpy((char*) dst->pz, (char*) src->pz, (size_t) len);
} }
} }
@ -1202,7 +1230,8 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) {
assert(pColBase->filterInfo[j].filterOnBinary == 0 || pColBase->filterInfo[j].filterOnBinary == 1); assert(pColBase->filterInfo[j].filterOnBinary == 0 || pColBase->filterInfo[j].filterOnBinary == 1);
if (pColBase->filterInfo[j].filterOnBinary) { if (pColBase->filterInfo[j].filterOnBinary) {
tfree(pColBase->filterInfo[j].pz); free((char*) pColBase->filterInfo[j].pz);
pColBase->filterInfo[j].pz = 0;
} }
} }
} }

View File

@ -279,7 +279,7 @@ typedef struct {
} SShellSubmitMsg; } SShellSubmitMsg;
typedef struct SSchema { typedef struct SSchema {
char type; uint8_t type;
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
short colId; short colId;
short bytes; short bytes;
@ -622,7 +622,7 @@ typedef struct {
char repStrategy; char repStrategy;
char loadLatest; // load into mem or not char loadLatest; // load into mem or not
char precision; // time resoluation uint8_t precision; // time resolution
char reserved[16]; char reserved[16];
} SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg; } SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg;

View File

@ -175,7 +175,7 @@ bool taosMbsToUcs4(char *mbs, int32_t mbs_len, char *ucs4, int32_t ucs4_max_len)
bool taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs); bool taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosValidateEncodec(char *encodec); bool taosValidateEncodec(const char *encodec);
bool taosGetVersionNumber(char *versionStr, int *versionNubmer); bool taosGetVersionNumber(char *versionStr, int *versionNubmer);

View File

@ -71,6 +71,7 @@ extern "C" {
#include <wchar.h> #include <wchar.h>
#include <wordexp.h> #include <wordexp.h>
#include <wctype.h> #include <wctype.h>
#include <inttypes.h>
#define taosCloseSocket(x) \ #define taosCloseSocket(x) \

View File

@ -37,6 +37,6 @@ int32_t mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pInfo, int32_t tableIndex,
int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes); int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes);
void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pInfo, int32_t index, tQueryResultset* pRes); void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pInfo, int32_t index, tQueryResultset* pRes);
bool tSkipListNodeFilterCallback(struct tSkipListNode *pNode, void *param); bool tSkipListNodeFilterCallback(const void *pNode, void *param);
#endif //TBASE_MGMTUTIL_H #endif //TBASE_MGMTUTIL_H

View File

@ -203,7 +203,7 @@ static bool mgmtTablenameFilterCallback(tSkipListNode* pNode, void* param) {
static void mgmtRetrieveFromLikeOptr(tQueryResultset* pRes, const char* str, STabObj* pMetric) { static void mgmtRetrieveFromLikeOptr(tQueryResultset* pRes, const char* str, STabObj* pMetric) {
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
SMeterNameFilterSupporter supporter = {info, (char*)str}; SMeterNameFilterSupporter supporter = {info, (char*) str};
pRes->num = pRes->num =
tSkipListIterateList(pMetric->pSkipList, (tSkipListNode***)&pRes->pRes, mgmtTablenameFilterCallback, &supporter); tSkipListIterateList(pMetric->pSkipList, (tSkipListNode***)&pRes->pRes, mgmtTablenameFilterCallback, &supporter);
@ -230,13 +230,7 @@ static void mgmtFilterByTableNameCond(tQueryResultset* pRes, char* condStr, int3
free(str); free(str);
} }
UNUSED_FUNC static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param) {
/*
*-Wunused-function"
*/
#if 0
static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param; SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSchema s = {0}; SSchema s = {0};
@ -265,7 +259,6 @@ static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param) {
return false; return false;
} }
#endif
static void orderResult(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes, int16_t colIndex, int32_t tableIndex) { static void orderResult(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes, int16_t colIndex, int32_t tableIndex) {
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMetricMetaMsg + pMetricMetaMsg->metaElem[tableIndex]); SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMetricMetaMsg + pMetricMetaMsg->metaElem[tableIndex]);
@ -646,7 +639,8 @@ static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pS
} }
} }
void filterPrepare(tSQLBinaryExpr* pExpr, void* param) { void filterPrepare(void* expr, void* param) {
tSQLBinaryExpr *pExpr = (tSQLBinaryExpr*) expr;
if (pExpr->info != NULL) { if (pExpr->info != NULL) {
return; return;
} }
@ -800,9 +794,10 @@ static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, void* param)
} }
} }
bool tSkipListNodeFilterCallback(tSkipListNode* pNode, void* param) { bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*)param; tQueryInfo* pInfo = (tQueryInfo*)param;
STabObj* pMeter = (STabObj*)pNode->pData; STabObj* pMeter = (STabObj*)(((tSkipListNode*)pNode)->pData);
char name[TSDB_METER_NAME_LEN + 1] = {0}; char name[TSDB_METER_NAME_LEN + 1] = {0};
char* val = getTagValueFromMeter(pMeter, pInfo->offset, name); char* val = getTagValueFromMeter(pMeter, pInfo->offset, name);

View File

@ -1213,8 +1213,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
dTrace("QInfo:%p reset signature", pQInfo); dTrace("QInfo:%p reset signature", pQInfo);
TSDB_QINFO_RESET_SIG(pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
return; return;
} }
@ -1235,8 +1235,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
dTrace("QInfo:%p reset signature", pQInfo); dTrace("QInfo:%p reset signature", pQInfo);
TSDB_QINFO_RESET_SIG(pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
return; return;
} }
} }
@ -1247,8 +1247,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead); pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead);
vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter); vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter);
TSDB_QINFO_RESET_SIG(pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
return; return;
} }
@ -1284,8 +1284,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
} }
TSDB_QINFO_RESET_SIG(pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
} }
void vnodeMultiMeterQuery(SSchedMsg *pMsg) { void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
@ -1335,6 +1335,6 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
vnodePrintQueryStatistics(pSupporter); vnodePrintQueryStatistics(pSupporter);
} }
TSDB_QINFO_RESET_SIG(pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
} }

View File

@ -472,7 +472,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
vnodeFreeQInfoInQueue(pObj->qhandle); vnodeFreeQInfo(pObj->qhandle, true);
pObj->qhandle = NULL; pObj->qhandle = NULL;
} }
@ -480,8 +480,6 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
_exit: _exit:
free(pSched->msg); free(pSched->msg);
return;
} }
int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) { int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) {

View File

@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <inttypes.h>
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
#include "taosmsg.h" #include "taosmsg.h"

View File

@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <inttypes.h>
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"

View File

@ -12,11 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <inttypes.h> #include "os.h"
#include <float.h>
#include <math.h>
#include <stdbool.h>
#include <stdlib.h>
#include "tlog.h" #include "tlog.h"
#include "tsdb.h" #include "tsdb.h"

View File

@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <inttypes.h>
#include "os.h" #include "os.h"
#include "tstrbuild.h" #include "tstrbuild.h"

View File

@ -14,7 +14,6 @@
*/ */
#include "os.h" #include "os.h"
#include <inttypes.h>
#include "tlog.h" #include "tlog.h"
#include "tsched.h" #include "tsched.h"
#include "ttime.h" #include "ttime.h"

View File

@ -510,7 +510,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
if ((z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' || z[i] == 'y' || if ((z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' || z[i] == 'y' ||
z[i] == 'w' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' || z[i] == 'w' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' ||
z[i] == 'Y' || z[i] == 'W') && z[i] == 'Y' || z[i] == 'W') &&
(isIdChar[(int)(z[i + 1])] == 0)) { (isIdChar[(uint8_t)z[i + 1]] == 0)) {
*tokenType = TK_VARIABLE; *tokenType = TK_VARIABLE;
i += 1; i += 1;
return i; return i;
@ -551,7 +551,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
case 't': case 't':
case 'F': case 'F':
case 'f': { case 'f': {
for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[(int)(z[i])]; i++) { for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[(uint8_t) z[i]]; i++) {
} }
if ((i == 4 && strncasecmp(z, "true", 4) == 0) || (i == 5 && strncasecmp(z, "false", 5) == 0)) { if ((i == 4 && strncasecmp(z, "true", 4) == 0) || (i == 5 && strncasecmp(z, "false", 5) == 0)) {
@ -560,10 +560,10 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
} }
} }
default: { default: {
if (((*z & 0x80) != 0) || !isIdChar[(int)(*z)]) { if (((*z & 0x80) != 0) || !isIdChar[(uint8_t) *z]) {
break; break;
} }
for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[(int)(z[i])]; i++) { for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[(uint8_t) z[i]]; i++) {
} }
*tokenType = tSQLKeywordCode(z, i); *tokenType = tSQLKeywordCode(z, i);
return i; return i;

View File

@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <inttypes.h>
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
#include "tsdb.h" #include "tsdb.h"

View File

@ -443,7 +443,7 @@ bool taosMbsToUcs4(char *mbs, int32_t mbs_len, char *ucs4, int32_t ucs4_max_len)
#endif #endif
} }
bool taosValidateEncodec(char *encodec) { bool taosValidateEncodec(const char *encodec) {
#ifdef USE_LIBICONV #ifdef USE_LIBICONV
iconv_t cd = iconv_open(encodec, DEFAULT_UNICODE_ENCODEC); iconv_t cd = iconv_open(encodec, DEFAULT_UNICODE_ENCODEC);
if (cd == (iconv_t)(-1)) { if (cd == (iconv_t)(-1)) {