From 0c30d77ab39d3c119a32512b2a28884702f757f2 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 9 Dec 2019 01:40:21 +0800 Subject: [PATCH 1/7] fix bugs for high throughput of query request --- src/system/detail/src/vnodeQueryProcess.c | 10 +++++----- src/system/detail/src/vnodeShell.c | 4 +--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index cec76d1cba..6fa0891baa 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -1213,8 +1213,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { dTrace("QInfo:%p reset signature", pQInfo); - TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); + TSDB_QINFO_RESET_SIG(pQInfo); return; } @@ -1235,8 +1235,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { dTrace("QInfo:%p reset signature", pQInfo); - TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); + TSDB_QINFO_RESET_SIG(pQInfo); return; } } @@ -1247,8 +1247,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead); vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter); - TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); + TSDB_QINFO_RESET_SIG(pQInfo); return; } @@ -1284,8 +1284,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); } - TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); + TSDB_QINFO_RESET_SIG(pQInfo); } void vnodeMultiMeterQuery(SSchedMsg *pMsg) { @@ -1335,6 +1335,6 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { vnodePrintQueryStatistics(pSupporter); } - TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); + TSDB_QINFO_RESET_SIG(pQInfo); } diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 512f675da4..5eea6632af 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -473,7 +473,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); - vnodeFreeQInfoInQueue(pObj->qhandle); + vnodeFreeQInfo(pObj->qhandle); pObj->qhandle = NULL; } @@ -481,8 +481,6 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { _exit: free(pSched->msg); - - return; } int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) { From c12fb22e0a9b202305faff72c935dd3c7ab8a8c0 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 9 Dec 2019 02:09:17 +0800 Subject: [PATCH 2/7] suppress compile warnings --- src/client/inc/tsclient.h | 6 +++--- src/client/src/tscFunctionImpl.c | 2 +- src/client/src/tscJoinProcess.c | 12 ++++++------ src/client/src/tscLocal.c | 4 ++-- src/client/src/tscProfile.c | 4 ++-- src/client/src/tscServer.c | 4 ++-- src/client/src/tscSql.c | 6 +++--- src/client/src/tscStream.c | 30 +++++++++++++++--------------- src/client/src/tscSub.c | 2 +- src/client/src/tscUtil.c | 3 ++- src/inc/taosmsg.h | 2 +- src/inc/tutil.h | 2 +- src/os/linux/inc/os.h | 1 + src/util/src/textbuffer.c | 1 - src/util/src/thistogram.c | 1 - src/util/src/tskiplist.c | 6 +----- src/util/src/tstrbuild.c | 1 - src/util/src/ttimer.c | 1 - src/util/src/ttokenizer.c | 8 ++++---- src/util/src/ttypes.c | 1 - src/util/src/tutil.c | 2 +- 21 files changed, 46 insertions(+), 53 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b36d2362da..4de33f5ac4 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -274,7 +274,7 @@ typedef struct { int8_t isInsertFromFile; // load data from file or not bool import; // import/insert type - char msgType; + uint8_t msgType; uint16_t type; // query type char intervalTimeUnit; int64_t etime, stime; @@ -378,14 +378,14 @@ typedef struct _sql_obj { char * sqlstr; char retry; char maxRetry; - char index; + uint8_t index; char freed : 4; char listed : 4; tsem_t rspSem; tsem_t emptyRspSem; SSqlCmd cmd; SSqlRes res; - char numOfSubs; + uint8_t numOfSubs; struct _sql_obj **pSubs; struct _sql_obj * prev, *next; } SSqlObj; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 2d913b2f20..3160c9c1c3 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -1668,7 +1668,7 @@ static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t ind if (pInfo->hasResult != DATA_SET_FLAG || pInfo->ts < timestamp[index]) { #if defined(_DEBUG_VIEW) - pTrace("assign index:%d, ts:%lld, val:%d, ", index, timestamp[index], *(int32_t *)pData); + pTrace("assign index:%d, ts:%" PRId64 ", val:%d, ", index, timestamp[index], *(int32_t *)pData); #endif memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index dd130ed1ef..375e0066b1 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "tscJoinProcess.h" #include "os.h" +#include "tscJoinProcess.h" #include "tcache.h" #include "tscUtil.h" #include "tsclient.h" @@ -88,7 +88,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor #ifdef _DEBUG_VIEW // for debug purpose - tscPrint("%lld, tags:%d \t %lld, tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); + tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); #endif if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) { @@ -150,7 +150,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor tsBufDestory(pSupporter1->pTSBuf); tsBufDestory(pSupporter2->pTSBuf); - tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", pSql, + tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks intersecting", pSql, numOfInput1, numOfInput2, output1->numOfTotal); return output1->numOfTotal; @@ -528,8 +528,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { numOfFetch++; } } else { - if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && tscProjectionQueryOnTable(pSql)) - || (pRes->numOfRows == 0)) { + if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && + tscProjectionQueryOnTable(&pSql->cmd)) || (pRes->numOfRows == 0)) { numOfFetch++; } } @@ -1619,7 +1619,7 @@ void tsBufDisplay(STSBuf* pTSBuf) { while (tsBufNextPos(pTSBuf)) { STSElem elem = tsBufGetElem(pTSBuf); - printf("%d-%lld-%lld\n", elem.vnode, elem.tag, elem.ts); + printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts); } pTSBuf->cur.order = old; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 5ae72acd57..ecd2f97e39 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -64,7 +64,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type } break; case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: - len = sprintf(buf, "%lld", *(int64_t *)pData); + len = sprintf(buf, "%" PRId64 "", *(int64_t *)pData); break; case TSDB_DATA_TYPE_BOOL: len = MAX_BOOL_TYPE_LENGTH; @@ -228,7 +228,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { sprintf(target, "%d", *(int32_t *)pTagValue); break; case TSDB_DATA_TYPE_BIGINT: - sprintf(target, "%lld", *(int64_t *)pTagValue); + sprintf(target, "%" PRId64 "", *(int64_t *)pTagValue); break; case TSDB_DATA_TYPE_BOOL: { char *val = (*((int8_t *)pTagValue) == 0) ? "false" : "true"; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 61bc9dd99e..9dc716bc12 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -93,10 +93,10 @@ void tscSaveSlowQuery(SSqlObj *pSql) { const static int64_t SLOW_QUERY_INTERVAL = 3000000L; if (pSql->res.useconds < SLOW_QUERY_INTERVAL) return; - tscTrace("%p query time:%lld sql:%s", pSql, pSql->res.useconds, pSql->sqlstr); + tscTrace("%p query time:%" PRId64 " sql:%s", pSql, pSql->res.useconds, pSql->sqlstr); char *sql = malloc(200); - int len = snprintf(sql, 200, "insert into %s.slowquery values(now, '%s', %lld, %lld, '", tsMonitorDbName, + int len = snprintf(sql, 200, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName, pSql->pTscObj->user, pSql->stime, pSql->res.useconds); int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr); if (sqlLen > TSDB_SHOW_SQL_LEN - 1) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1805eac38d..28cefeea73 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3634,7 +3634,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { */ if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) { if (pMeterMetaInfo->pMeterMeta) { - tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql, + tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta); } tscWaitingForCreateTable(&pSql->cmd); @@ -3642,7 +3642,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { code = tscDoGetMeterMeta(pSql, meterId, 0); // todo ?? } else { - tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql, + tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 04f9fc0aa6..a5aa304852 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -532,7 +532,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { doSetResultRowData(pSql->pSubs[1]); // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; - // printf("first:%lld, second:%lld\n", key1, key2); + // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); success = true; pRes1->row++; pRes2->row++; @@ -903,7 +903,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) break; case TSDB_DATA_TYPE_BIGINT: - len += sprintf(str + len, "%lld ", *((int64_t *)row[i])); + len += sprintf(str + len, "%" PRId64 " ", *((int64_t *)row[i])); break; case TSDB_DATA_TYPE_FLOAT: @@ -928,7 +928,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) } break; case TSDB_DATA_TYPE_TIMESTAMP: - len += sprintf(str + len, "%lld ", *((int64_t *)row[i])); + len += sprintf(str + len, "%" PRId64 " ", *((int64_t *)row[i])); break; case TSDB_DATA_TYPE_BOOL: diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 288f906594..933960b893 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -85,7 +85,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { // failed to get meter/metric meta, retry in 10sec. if (code != TSDB_CODE_SUCCESS) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); - tscError("%p stream:%p,get metermeta failed, retry in %lldms", pStream->pSql, pStream, retryDelayTime); + tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime); return; @@ -136,7 +136,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf SSqlStream *pStream = (SSqlStream *)param; if (tres == NULL || numOfRows < 0) { int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); - tscError("%p stream:%p, query data failed, code:%d, retry in %lldms", pStream->pSql, pStream, numOfRows, + tscError("%p stream:%p, query data failed, code:%d, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows, retryDelay); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pStream->pSql->cmd, 0); @@ -158,7 +158,7 @@ static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) { if (timestamp != actualTimestamp) { // reset the timestamp of each agg point by using start time of each interval *((int64_t *)pRes->data) = actualTimestamp; - tscWarn("%p stream:%p, timestamp of points is:%lld, reset to %lld", pSql, pStream, timestamp, actualTimestamp); + tscWarn("%p stream:%p, timestamp of points is:%" PRId64 ", reset to %" PRId64 "", pSql, pStream, timestamp, actualTimestamp); } } @@ -169,7 +169,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf if (pSql == NULL || numOfRows < 0) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); - tscError("%p stream:%p, retrieve data failed, code:%d, retry in %lldms", pSql, pStream, numOfRows, retryDelayTime); + tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); tscClearMeterMetaInfo(pMeterMetaInfo, true); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); @@ -235,7 +235,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf /* no resuls in the query range, retry */ // todo set retry dynamic time int32_t retry = tsProjectExecInterval; - tscError("%p stream:%p, retrieve no data, code:%d, retry in %lldms", pSql, pStream, numOfRows, retry); + tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry); tscClearSqlMetaInfoForce(&(pStream->pSql->cmd)); tscSetRetryTimer(pStream, pStream->pSql, retry); @@ -265,7 +265,7 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) /* * current time window will be closed, since it too early to exceed the maxRetentWindow value */ - tscTrace("%p stream:%p, etime:%lld is too old, exceeds the max retention time window:%lld, stop the stream", + tscTrace("%p stream:%p, etime:%" PRId64 " is too old, exceeds the max retention time window:%" PRId64 ", stop the stream", pStream->pSql, pStream, pStream->stime, pStream->etime); // TODO : How to terminate stream here taos_close_stream(pStream); @@ -276,10 +276,10 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) return; } - tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", pStream->pSql, pStream, + tscTrace("%p stream:%p, next query start at %" PRId64 ", in %" PRId64 "ms. query range %" PRId64 "-%" PRId64 "", pStream->pSql, pStream, now + timer, timer, pStream->stime, etime); } else { - tscTrace("%p stream:%p, next query start at %lld, in %lldms. query range %lld-%lld", pStream->pSql, pStream, + tscTrace("%p stream:%p, next query start at %" PRId64 ", in %" PRId64 "ms. query range %" PRId64 "-%" PRId64 "", pStream->pSql, pStream, pStream->stime, timer, pStream->stime - pStream->interval, pStream->stime - 1); } @@ -299,7 +299,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { */ timer = pStream->slidingTime; if (pStream->stime > pStream->etime) { - tscTrace("%p stream:%p, stime:%lld is larger than end time: %lld, stop the stream", pStream->pSql, pStream, + tscTrace("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream, pStream->stime, pStream->etime); // TODO : How to terminate stream here taos_close_stream(pStream); @@ -353,7 +353,7 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { int64_t minIntervalTime = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime; if (pCmd->nAggTimeInterval < minIntervalTime) { - tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%lld", pSql, pStream, + tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64 "", pSql, pStream, pCmd->nAggTimeInterval, minIntervalTime); pCmd->nAggTimeInterval = minIntervalTime; } @@ -368,14 +368,14 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime; if (pCmd->nSlidingTime < minSlidingTime) { - tscWarn("%p stream:%p, original sliding value:%lld too small, reset to:%lld", pSql, pStream, pCmd->nSlidingTime, + tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream, pCmd->nSlidingTime, minSlidingTime); pCmd->nSlidingTime = minSlidingTime; } if (pCmd->nSlidingTime > pCmd->nAggTimeInterval) { - tscWarn("%p stream:%p, sliding value:%lld can not be larger than interval range, reset to:%lld", pSql, pStream, + tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64 "", pSql, pStream, pCmd->nSlidingTime, pCmd->nAggTimeInterval); pCmd->nSlidingTime = pCmd->nAggTimeInterval; @@ -401,11 +401,11 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in } else { // timewindow based aggregation stream if (stime == 0) { // no data in meter till now stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; - tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime); + tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64 "", pSql, pStream, stime); } else { int64_t newStime = (stime / pStream->interval) * pStream->interval; if (newStime != stime) { - tscWarn("%p stream:%p, last timestamp:%lld, reset to:%lld", pSql, pStream, stime, newStime); + tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64 "", pSql, pStream, stime, newStime); stime = newStime; } } @@ -537,7 +537,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p int64_t starttime = tscGetLaunchTimestamp(pStream); taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); - tscTrace("%p stream:%p is opened, query on:%s, interval:%lld, sliding:%lld, first launched in:%lld, sql:%s", pSql, + tscTrace("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, pStream, pMeterMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, sqlstr); return pStream; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index bcbcaba4c4..d6bc1eebe9 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -106,7 +106,7 @@ TAOS_ROW taos_consume(TAOS_SUB *tsub) { pSub->stime = taosGetTimestampMs(); - sprintf(qstr, "select * from %s where _c0 > %lld order by _c0 asc", pSub->name, pSub->lastKey); + sprintf(qstr, "select * from %s where _c0 > %" PRId64 " order by _c0 asc", pSub->name, pSub->lastKey); if (taos_query(pSub->taos, qstr)) { tscTrace("failed to select, reason:%s", taos_errstr(pSub->taos)); return NULL; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d0da79651e..3ff262bf68 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1202,7 +1202,8 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) { assert(pColBase->filterInfo[j].filterOnBinary == 0 || pColBase->filterInfo[j].filterOnBinary == 1); if (pColBase->filterInfo[j].filterOnBinary) { - tfree(pColBase->filterInfo[j].pz); + free((char*) pColBase->filterInfo[j].pz); + pColBase->filterInfo[j].pz = 0; } } } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index d3634219aa..f39786da2c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -279,7 +279,7 @@ typedef struct { } SShellSubmitMsg; typedef struct SSchema { - char type; + uint8_t type; char name[TSDB_COL_NAME_LEN]; short colId; short bytes; diff --git a/src/inc/tutil.h b/src/inc/tutil.h index bdf9df63ee..8469959bb6 100644 --- a/src/inc/tutil.h +++ b/src/inc/tutil.h @@ -175,7 +175,7 @@ bool taosMbsToUcs4(char *mbs, int32_t mbs_len, char *ucs4, int32_t ucs4_max_len) bool taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs); -bool taosValidateEncodec(char *encodec); +bool taosValidateEncodec(const char *encodec); bool taosGetVersionNumber(char *versionStr, int *versionNubmer); diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index a3d50400c3..8d63e69cd6 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -71,6 +71,7 @@ extern "C" { #include #include #include +#include #define taosCloseSocket(x) \ diff --git a/src/util/src/textbuffer.c b/src/util/src/textbuffer.c index 42c2cc1ed0..c72d725ba1 100644 --- a/src/util/src/textbuffer.c +++ b/src/util/src/textbuffer.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include #include "os.h" #include "taos.h" #include "taosmsg.h" diff --git a/src/util/src/thistogram.c b/src/util/src/thistogram.c index 03c2294938..a3f6e7203c 100644 --- a/src/util/src/thistogram.c +++ b/src/util/src/thistogram.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include #include "os.h" #include "taosmsg.h" diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 0440e5d5ab..01c91b6c65 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -12,11 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include -#include -#include -#include -#include +#include "os.h" #include "tlog.h" #include "tsdb.h" diff --git a/src/util/src/tstrbuild.c b/src/util/src/tstrbuild.c index 439370ce07..61a6d67952 100644 --- a/src/util/src/tstrbuild.c +++ b/src/util/src/tstrbuild.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include #include "os.h" #include "tstrbuild.h" diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index e276710afa..ccac1de518 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -14,7 +14,6 @@ */ #include "os.h" -#include #include "tlog.h" #include "tsched.h" #include "ttime.h" diff --git a/src/util/src/ttokenizer.c b/src/util/src/ttokenizer.c index f402963049..0fbc4dc935 100644 --- a/src/util/src/ttokenizer.c +++ b/src/util/src/ttokenizer.c @@ -510,7 +510,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) { if ((z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' || z[i] == 'y' || z[i] == 'w' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' || z[i] == 'Y' || z[i] == 'W') && - (isIdChar[z[i + 1]] == 0)) { + (isIdChar[(uint8_t)z[i + 1]] == 0)) { *tokenType = TK_VARIABLE; i += 1; return i; @@ -551,7 +551,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) { case 't': case 'F': case 'f': { - for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[z[i]]; i++) { + for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[(uint8_t) z[i]]; i++) { } if ((i == 4 && strncasecmp(z, "true", 4) == 0) || (i == 5 && strncasecmp(z, "false", 5) == 0)) { @@ -560,10 +560,10 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) { } } default: { - if (((*z & 0x80) != 0) || !isIdChar[*z]) { + if (((*z & 0x80) != 0) || !isIdChar[(uint8_t) *z]) { break; } - for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[z[i]]; i++) { + for (i = 1; ((z[i] & 0x80) == 0) && isIdChar[(uint8_t) z[i]]; i++) { } *tokenType = tSQLKeywordCode(z, i); return i; diff --git a/src/util/src/ttypes.c b/src/util/src/ttypes.c index 3f0c1732e3..ad3f98beb6 100644 --- a/src/util/src/ttypes.c +++ b/src/util/src/ttypes.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include #include "os.h" #include "taos.h" #include "tsdb.h" diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index e0dcb0aa3f..016ce1bb55 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -443,7 +443,7 @@ bool taosMbsToUcs4(char *mbs, int32_t mbs_len, char *ucs4, int32_t ucs4_max_len) #endif } -bool taosValidateEncodec(char *encodec) { +bool taosValidateEncodec(const char *encodec) { #ifdef USE_LIBICONV iconv_t cd = iconv_open(encodec, DEFAULT_UNICODE_ENCODEC); if (cd == (iconv_t)(-1)) { From a5df5d938c991cb01cc7a0e968b30a1ae055f848 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 9 Dec 2019 22:19:35 +0800 Subject: [PATCH 3/7] suppress compile warnings --- src/client/src/tscAst.c | 7 +++---- src/client/src/tscServer.c | 9 ++++++--- src/client/src/tscSub.c | 4 ++-- src/client/src/tscUtil.c | 3 ++- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/client/src/tscAst.c b/src/client/src/tscAst.c index a06a075248..d071358dbf 100644 --- a/src/client/src/tscAst.c +++ b/src/client/src/tscAst.c @@ -643,13 +643,12 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults } /* - * + * traverse the result and apply the function to each item to check if the item is qualified or not */ -void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, bool (*fp)(tSkipListNode *, void *), - tQueryResultset * pResult) { +static void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) { assert(pExpr->pLeft->nodeType == TSQL_NODE_COL && pExpr->pRight->nodeType == TSQL_NODE_VALUE); - // brutal force search + // brutal force scan the result list and check for each item in the list int64_t num = pResult->num; for (int32_t i = 0, j = 0; i < pResult->num; ++i) { if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 28cefeea73..0913cf956c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3350,7 +3350,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { } int tscProcessConnectRsp(SSqlObj *pSql) { - char temp[TSDB_METER_ID_LEN]; + char temp[TSDB_METER_ID_LEN*2]; SConnectRsp *pConnect; STscObj *pObj = pSql->pTscObj; @@ -3358,8 +3358,11 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pConnect = (SConnectRsp *)pRes->pRsp; strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response - sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); - strcpy(pObj->db, temp); + int32_t len =sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); + + assert(len <= tListLen(pObj->db)); + strncpy(pObj->db, temp, tListLen(pObj->db)); + #ifdef CLUSTER SIpList * pIpList; char *rsp = pRes->pRsp + sizeof(SConnectRsp); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index d6bc1eebe9..f2e9395c68 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -56,7 +56,7 @@ TAOS_SUB *taos_subscribe(const char *host, const char *user, const char *pass, c if (pSub->taos == NULL) { tfree(pSub); } else { - char qstr[128]; + char qstr[256] = {0}; sprintf(qstr, "use %s", db); int res = taos_query(pSub->taos, qstr); if (res != 0) { @@ -64,7 +64,7 @@ TAOS_SUB *taos_subscribe(const char *host, const char *user, const char *pass, c taos_close(pSub->taos); tfree(pSub); } else { - sprintf(qstr, "select * from %s where _c0 > now+1000d", pSub->name); + snprintf(qstr, tListLen(qstr), "select * from %s where _c0 > now+1000d", pSub->name); if (taos_query(pSub->taos, qstr)) { tscTrace("failed to select, reason:%s", taos_errstr(pSub->taos)); taos_close(pSub->taos); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 3ff262bf68..288e564cb0 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1138,7 +1138,8 @@ void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* sr *dst = *src; if (dst->filterOnBinary) { size_t len = (size_t) dst->len + 1; - dst->pz = calloc(1, len); + char* pTmp = calloc(1, len); + dst->pz = (int64_t) pTmp; memcpy((char*) dst->pz, (char*) src->pz, (size_t) len); } } From 2ab2e2831e77ef51edfd5f9fff373db97ee603ab Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 9 Dec 2019 23:23:05 +0800 Subject: [PATCH 4/7] suppress some compile warnings --- src/inc/taosmsg.h | 2 +- src/system/detail/inc/mgmtUtil.h | 2 +- src/system/detail/src/mgmtSupertableQuery.c | 12 +++++++----- src/system/detail/src/vnodeShell.c | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f39786da2c..895cf23cf8 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -622,7 +622,7 @@ typedef struct { char repStrategy; char loadLatest; // load into mem or not - char precision; // time resoluation + uint8_t precision; // time resolution char reserved[16]; } SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg; diff --git a/src/system/detail/inc/mgmtUtil.h b/src/system/detail/inc/mgmtUtil.h index 04bacbe1db..1f70485894 100644 --- a/src/system/detail/inc/mgmtUtil.h +++ b/src/system/detail/inc/mgmtUtil.h @@ -37,6 +37,6 @@ int32_t mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pInfo, int32_t tableIndex, int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes); void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pInfo, int32_t index, tQueryResultset* pRes); -bool tSkipListNodeFilterCallback(struct tSkipListNode *pNode, void *param); +bool tSkipListNodeFilterCallback(const void *pNode, void *param); #endif //TBASE_MGMTUTIL_H diff --git a/src/system/detail/src/mgmtSupertableQuery.c b/src/system/detail/src/mgmtSupertableQuery.c index 4dc7760d89..9b39532276 100644 --- a/src/system/detail/src/mgmtSupertableQuery.c +++ b/src/system/detail/src/mgmtSupertableQuery.c @@ -203,7 +203,7 @@ static bool mgmtTablenameFilterCallback(tSkipListNode* pNode, void* param) { static void mgmtRetrieveFromLikeOptr(tQueryResultset* pRes, const char* str, STabObj* pMetric) { SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; - SMeterNameFilterSupporter supporter = {info, str}; + SMeterNameFilterSupporter supporter = {info, (char*) str}; pRes->num = tSkipListIterateList(pMetric->pSkipList, (tSkipListNode***)&pRes->pRes, mgmtTablenameFilterCallback, &supporter); @@ -230,7 +230,7 @@ static void mgmtFilterByTableNameCond(tQueryResultset* pRes, char* condStr, int3 free(str); } -static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param) { +UNUSED_FUNC static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; SSchema s = {0}; @@ -639,7 +639,8 @@ static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pS } } -void filterPrepare(tSQLBinaryExpr* pExpr, void* param) { +void filterPrepare(void* expr, void* param) { + tSQLBinaryExpr *pExpr = (tSQLBinaryExpr*) expr; if (pExpr->info != NULL) { return; } @@ -791,9 +792,10 @@ static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, void* param) } } -bool tSkipListNodeFilterCallback(tSkipListNode* pNode, void* param) { +bool tSkipListNodeFilterCallback(const void* pNode, void* param) { + tQueryInfo* pInfo = (tQueryInfo*)param; - STabObj* pMeter = (STabObj*)pNode->pData; + STabObj* pMeter = (STabObj*)(((tSkipListNode*)pNode)->pData); char name[TSDB_METER_NAME_LEN + 1] = {0}; char* val = getTagValueFromMeter(pMeter, pInfo->offset, name); diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 5eea6632af..9ae33c43b7 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -473,7 +473,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); - vnodeFreeQInfo(pObj->qhandle); + vnodeFreeQInfo(pObj->qhandle, true); pObj->qhandle = NULL; } From d2ef1dad53744f8297b0b338cb390cbc8304e2c5 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 10 Dec 2019 12:16:10 +0800 Subject: [PATCH 5/7] [tbase-1287] --- src/client/inc/tscUtil.h | 6 +-- src/client/inc/tsclient.h | 25 ++++++----- src/client/src/tscParseInsert.c | 6 +-- src/client/src/tscUtil.c | 77 ++++++++++++++++++++++----------- 4 files changed, 72 insertions(+), 42 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3deb4c463f..5d828d7cf0 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter { } SJoinSubquerySupporter; void tscDestroyDataBlock(STableDataBlocks* pDataBlock); -STableDataBlocks* tscCreateDataBlock(int32_t size); +STableDataBlocks* tscCreateDataBlock(size_t initialBufSize, int32_t rowSize, int32_t startOffset, const char* name); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset); @@ -78,9 +78,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDa void tscFreeUnusedDataBlocks(SDataBlockList* pList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, char* tableId); -STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name); - + int32_t startOffset, int32_t rowSize, const char* tableId); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4de33f5ac4..be9ba47f2a 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -231,17 +231,22 @@ typedef struct SParamInfo { typedef struct STableDataBlocks { char meterId[TSDB_METER_ID_LEN]; - int8_t tsSource; - bool ordered; + int8_t tsSource; // where does the UNIX timestamp come from, server or client + bool ordered; // if current rows are ordered or not + int64_t vgid; // virtual group id + int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending + int32_t numOfMeters; // number of tables in current submit block - int64_t vgid; - int64_t prevTS; - - int32_t numOfMeters; - - int32_t rowSize; + int32_t rowSize; // row size for current table uint32_t nAllocSize; uint32_t size; + + /* + * the metermeta for current table, the metermeta will be used during submit stage, keep a ref + * to avoid it to be removed from cache + */ + SMeterMeta* pMeterMeta; + union { char *filename; char *pData; @@ -255,8 +260,8 @@ typedef struct STableDataBlocks { typedef struct SDataBlockList { int32_t idx; - int32_t nSize; - int32_t nAlloc; + uint32_t nSize; + uint32_t nAlloc; char * userParam; /* user assigned parameters for async query */ void * udfp; /* user defined function pointer, used in async model */ STableDataBlocks **pData; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 6317decbe0..a07ad6f211 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -985,7 +985,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { strcpy(fname, full_path.we_wordv[0]); wordfree(&full_path); - STableDataBlocks *pDataBlock = tscCreateDataBlockEx(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, + STableDataBlocks *pDataBlock = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name); tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); @@ -1222,8 +1222,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { int32_t rowSize = pMeterMeta->rowSize; pCmd->pDataBlocks = tscCreateBlockArrayList(); - STableDataBlocks *pTableDataBlock = - tscCreateDataBlockEx(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name); + STableDataBlocks *pTableDataBlock = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, + sizeof(SShellSubmitBlock), pMeterMetaInfo->name); tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 288e564cb0..7c7310a1c7 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -451,15 +451,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { free(pSql); } -STableDataBlocks* tscCreateDataBlock(int32_t size) { - STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); - dataBuf->nAllocSize = (uint32_t)size; - dataBuf->pData = calloc(1, dataBuf->nAllocSize); - dataBuf->ordered = true; - dataBuf->prevTS = INT64_MIN; - return dataBuf; -} - void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { if (pDataBlock == NULL) { return; @@ -467,6 +458,9 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { tfree(pDataBlock->pData); tfree(pDataBlock->params); + + // free the refcount for metermeta + taosRemoveDataFromCache(tscCacheHandle, (void**) &(pDataBlock->pMeterMeta), false); tfree(pDataBlock); } @@ -513,11 +507,11 @@ SDataBlockList* tscCreateBlockArrayList() { void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) { if (pList->nSize >= pList->nAlloc) { - pList->nAlloc = pList->nAlloc << 1; - pList->pData = realloc(pList->pData, sizeof(void*) * (size_t)pList->nAlloc); + pList->nAlloc = (pList->nAlloc) << 1U; + pList->pData = realloc(pList->pData, POINTER_BYTES * (size_t)pList->nAlloc); // reset allocated memory - memset(pList->pData + pList->nSize, 0, sizeof(void*) * (pList->nAlloc - pList->nSize)); + memset(pList->pData + pList->nSize, 0, POINTER_BYTES * (pList->nAlloc - pList->nSize)); } pList->pData[pList->nSize++] = pBlocks; @@ -539,29 +533,43 @@ void* tscDestroyBlockArrayList(SDataBlockList* pList) { } int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { - SSqlCmd* pCmd = &pSql->cmd; - + SSqlCmd *pCmd = &pSql->cmd; + assert(pDataBlock->pMeterMeta != NULL); + pCmd->count = pDataBlock->numOfMeters; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); - strcpy(pMeterMetaInfo->name, pDataBlock->meterId); - + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + + //set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache + if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) { + strcpy(pMeterMetaInfo->name, pDataBlock->meterId); + taosRemoveDataFromCache(tscCacheHandle, (void**) &(pMeterMetaInfo->pMeterMeta), false); + + pMeterMetaInfo->pMeterMeta = pDataBlock->pMeterMeta; + pDataBlock->pMeterMeta = NULL; // delegate the ownership of metermeta to pMeterMetaInfo + } else { + assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0); + } + /* * the submit message consists of : [RPC header|message body|digest] * the dataBlock only includes the RPC Header buffer and actual submit messsage body, space for digest needs * additional space. */ int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + sizeof(STaosDigest)); - if (TSDB_CODE_SUCCESS != ret) return ret; + if (TSDB_CODE_SUCCESS != ret) { + return ret; + } + memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize); - + /* * the payloadLen should be actual message body size * the old value of payloadLen is the allocated payload size */ pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; - + assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + sizeof(STaosDigest)); - return tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + return TSDB_CODE_SUCCESS; } void tscFreeUnusedDataBlocks(SDataBlockList* pList) { @@ -573,19 +581,38 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) { } } -STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) { - STableDataBlocks* dataBuf = tscCreateDataBlock(size); +/** + * create the in-memory buffer for each table to keep the submitted data block + * @param initialSize + * @param rowSize + * @param startOffset + * @param name + * @param pMeterMeta the ownership of pMeterMeta should be transfer to STableDataBlocks + * @return + */ +STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name) { + + STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); + dataBuf->nAllocSize = (uint32_t) initialSize; + dataBuf->pData = calloc(1, dataBuf->nAllocSize); + dataBuf->ordered = true; + dataBuf->prevTS = INT64_MIN; dataBuf->rowSize = rowSize; dataBuf->size = startOffset; dataBuf->tsSource = -1; strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN); + + // sure that the metermeta must be in the local client cache + dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId); + assert(dataBuf->pMeterMeta != NULL && initialSize > 0); + return dataBuf; } STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, char* tableId) { + int32_t startOffset, int32_t rowSize, const char* tableId) { STableDataBlocks* dataBuf = NULL; STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id); @@ -594,7 +621,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData } if (dataBuf == NULL) { - dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId); + dataBuf = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId); dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); tscAppendDataBlock(pDataBlockList, dataBuf); } From 6c860c7393ebd8bed91691468b5ab7050049b455 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 10 Dec 2019 12:35:45 +0800 Subject: [PATCH 6/7] [tbase-1317] --- src/client/src/tscSystem.c | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 60b90ac328..6efe344719 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -198,7 +198,9 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { switch (option) { case TSDB_OPTION_CONFIGDIR: cfg = tsGetConfigOption("configDir"); - if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + assert(cfg != NULL); + + if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { strncpy(configDir, pStr, TSDB_FILENAME_LEN); cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; tscPrint("set config file directory:%s", pStr); @@ -210,7 +212,9 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { case TSDB_OPTION_SHELL_ACTIVITY_TIMER: cfg = tsGetConfigOption("shellActivityTimer"); - if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + assert(cfg != NULL); + + if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { tsShellActivityTimer = atoi(pStr); if (tsShellActivityTimer < 1) tsShellActivityTimer = 1; if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600; @@ -224,13 +228,15 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { case TSDB_OPTION_LOCALE: { // set locale cfg = tsGetConfigOption("locale"); + assert(cfg != NULL); + size_t len = strlen(pStr); if (len == 0 || len > TSDB_LOCALE_LEN) { tscPrint("Invalid locale:%s, use default", pStr); return -1; } - if (cfg && cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { char sep = '.'; if (strlen(tsLocale) == 0) { // locale does not set yet @@ -285,13 +291,15 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { case TSDB_OPTION_CHARSET: { /* set charset will override the value of charset, assigned during system locale changed */ cfg = tsGetConfigOption("charset"); + assert(cfg != NULL); + size_t len = strlen(pStr); if (len == 0 || len > TSDB_LOCALE_LEN) { tscPrint("failed to set charset:%s", pStr); return -1; } - if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (taosValidateEncodec(pStr)) { if (strlen(tsCharset) == 0) { tscPrint("charset is set:%s", pStr); @@ -314,7 +322,9 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { case TSDB_OPTION_TIMEZONE: cfg = tsGetConfigOption("timezone"); - if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + assert(cfg != NULL); + + if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { strcpy(tsTimezone, pStr); tsSetTimeZone(); cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; @@ -327,7 +337,9 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { case TSDB_OPTION_SOCKET_TYPE: cfg = tsGetConfigOption("sockettype"); - if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + assert(cfg != NULL); + + if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) { tscError("only 'tcp' or 'udp' allowed for configuring the socket type"); return -1; @@ -340,6 +352,7 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { break; default: + // TODO return the correct error code to client in the format for taos_errstr() tscError("Invalid option %d", option); return -1; } From 19d516169036eab7af561154e86a22b18c4ba519 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 10 Dec 2019 12:39:01 +0800 Subject: [PATCH 7/7] [tbase-1316] --- src/client/src/tscStream.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 933960b893..31af78f618 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -447,7 +447,10 @@ static void setErrorInfo(STscObj* pObj, int32_t code, char* info) { SSqlCmd* pCmd = &pObj->pSql->cmd; pObj->pSql->res.code = code; - strncpy(pCmd->payload, info, pCmd->payloadLen); + + if (info != NULL) { + strncpy(pCmd->payload, info, pCmd->payloadLen); + } } TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),