Merge pull request #1114 from taosdata/feature/lihui
[modify for solving warn]
This commit is contained in:
commit
e030523241
|
@ -833,7 +833,7 @@ void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char
|
||||||
tSQLSyntaxNode *pRight = pExprs->pRight;
|
tSQLSyntaxNode *pRight = pExprs->pRight;
|
||||||
|
|
||||||
/* the left output has result from the left child syntax tree */
|
/* the left output has result from the left child syntax tree */
|
||||||
char *pLeftOutput = malloc(sizeof(int64_t) * numOfRows);
|
char *pLeftOutput = (char*)malloc(sizeof(int64_t) * numOfRows);
|
||||||
if (pLeft->nodeType == TSQL_NODE_EXPR) {
|
if (pLeft->nodeType == TSQL_NODE_EXPR) {
|
||||||
tSQLBinaryExprCalcTraverse(pLeft->pExpr, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
|
tSQLBinaryExprCalcTraverse(pLeft->pExpr, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
|
||||||
}
|
}
|
||||||
|
|
|
@ -793,7 +793,9 @@ STSBuf* tsBufCreate(bool autoDelete) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
allocResForTSBuf(pTSBuf);
|
if (NULL == allocResForTSBuf(pTSBuf)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// update the header info
|
// update the header info
|
||||||
STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSQL_SO_ASC};
|
STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSQL_SO_ASC};
|
||||||
|
|
|
@ -202,10 +202,10 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
|
||||||
tscTrace("%p stream:%p is killed, streamId:%d", pStream->pSql, pStream, killId);
|
tscTrace("%p stream:%p is killed, streamId:%d", pStream->pSql, pStream, killId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_close_stream(pStream);
|
|
||||||
if (pStream->callback) {
|
if (pStream->callback) {
|
||||||
pStream->callback(pStream->param);
|
pStream->callback(pStream->param);
|
||||||
}
|
}
|
||||||
|
taos_close_stream(pStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
|
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
|
||||||
|
@ -285,8 +285,9 @@ void tscKillConnection(STscObj *pObj) {
|
||||||
|
|
||||||
SSqlStream *pStream = pObj->streamList;
|
SSqlStream *pStream = pObj->streamList;
|
||||||
while (pStream) {
|
while (pStream) {
|
||||||
|
SSqlStream *tmp = pStream->next;
|
||||||
taos_close_stream(pStream);
|
taos_close_stream(pStream);
|
||||||
pStream = pStream->next;
|
pStream = tmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&pObj->mutex);
|
pthread_mutex_unlock(&pObj->mutex);
|
||||||
|
|
|
@ -2918,7 +2918,7 @@ static SColumnFilterInfo* addColumnFilterInfo(SColumnBase* pColumn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t size = pColumn->numOfFilters + 1;
|
int32_t size = pColumn->numOfFilters + 1;
|
||||||
char* tmp = realloc(pColumn->filterInfo, sizeof(SColumnFilterInfo) * (size));
|
char* tmp = (char*)realloc((void*)(pColumn->filterInfo), sizeof(SColumnFilterInfo) * (size));
|
||||||
if (tmp != NULL) {
|
if (tmp != NULL) {
|
||||||
pColumn->filterInfo = (SColumnFilterInfo*)tmp;
|
pColumn->filterInfo = (SColumnFilterInfo*)tmp;
|
||||||
}
|
}
|
||||||
|
|
|
@ -706,14 +706,14 @@ void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) {
|
||||||
pInfo->sqlType = type;
|
pInfo->sqlType = type;
|
||||||
|
|
||||||
if (nParam == 0) return;
|
if (nParam == 0) return;
|
||||||
if (pInfo->pDCLInfo == NULL) pInfo->pDCLInfo = calloc(1, sizeof(tDCLSQL));
|
if (pInfo->pDCLInfo == NULL) pInfo->pDCLInfo = (tDCLSQL *)calloc(1, sizeof(tDCLSQL));
|
||||||
|
|
||||||
va_list va;
|
va_list va;
|
||||||
va_start(va, nParam);
|
va_start(va, nParam);
|
||||||
|
|
||||||
while (nParam-- > 0) {
|
while (nParam-- > 0) {
|
||||||
SSQLToken *pToken = va_arg(va, SSQLToken *);
|
SSQLToken *pToken = va_arg(va, SSQLToken *);
|
||||||
tTokenListAppend(pInfo->pDCLInfo, pToken);
|
(void)tTokenListAppend(pInfo->pDCLInfo, pToken);
|
||||||
}
|
}
|
||||||
va_end(va);
|
va_end(va);
|
||||||
}
|
}
|
||||||
|
|
|
@ -602,7 +602,9 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
|
||||||
rlen += pExpr->resBytes;
|
rlen += pExpr->resBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t capacity = nBufferSizes / rlen;
|
int32_t capacity = 0;
|
||||||
|
if (0 != rlen) capacity = nBufferSizes / rlen;
|
||||||
|
|
||||||
pModel = tColModelCreate(pSchema, pCmd->fieldsInfo.numOfOutputCols, capacity);
|
pModel = tColModelCreate(pSchema, pCmd->fieldsInfo.numOfOutputCols, capacity);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) {
|
||||||
|
|
|
@ -1380,7 +1380,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
||||||
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
|
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
|
||||||
if (pNew == NULL) {
|
if (pNew == NULL) {
|
||||||
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
|
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
|
||||||
trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->subqueryIndex);
|
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
|
||||||
|
|
||||||
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
|
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
|
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
|
||||||
|
@ -1404,9 +1404,14 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
||||||
|
|
||||||
tscRetrieveFromVnodeCallBack(param, tres, pState->code);
|
tscRetrieveFromVnodeCallBack(param, tres, pState->code);
|
||||||
} else { // success, proceed to retrieve data from dnode
|
} else { // success, proceed to retrieve data from dnode
|
||||||
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
if (vnodeInfo != NULL) {
|
||||||
|
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
||||||
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
|
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
|
||||||
trsupport->subqueryIndex);
|
trsupport->subqueryIndex);
|
||||||
|
} else {
|
||||||
|
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
|
||||||
|
trsupport->subqueryIndex);
|
||||||
|
}
|
||||||
|
|
||||||
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
|
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
|
||||||
}
|
}
|
||||||
|
|
|
@ -268,11 +268,11 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
|
||||||
tscTrace("%p stream:%p, etime:%" PRId64 " is too old, exceeds the max retention time window:%" PRId64 ", stop the stream",
|
tscTrace("%p stream:%p, etime:%" PRId64 " is too old, exceeds the max retention time window:%" PRId64 ", stop the stream",
|
||||||
pStream->pSql, pStream, pStream->stime, pStream->etime);
|
pStream->pSql, pStream, pStream->stime, pStream->etime);
|
||||||
// TODO : How to terminate stream here
|
// TODO : How to terminate stream here
|
||||||
taos_close_stream(pStream);
|
|
||||||
if (pStream->callback) {
|
if (pStream->callback) {
|
||||||
// Callback function from upper level
|
// Callback function from upper level
|
||||||
pStream->callback(pStream->param);
|
pStream->callback(pStream->param);
|
||||||
}
|
}
|
||||||
|
taos_close_stream(pStream);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,11 +302,11 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
|
||||||
tscTrace("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream,
|
tscTrace("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream,
|
||||||
pStream->stime, pStream->etime);
|
pStream->stime, pStream->etime);
|
||||||
// TODO : How to terminate stream here
|
// TODO : How to terminate stream here
|
||||||
taos_close_stream(pStream);
|
|
||||||
if (pStream->callback) {
|
if (pStream->callback) {
|
||||||
// Callback function from upper level
|
// Callback function from upper level
|
||||||
pStream->callback(pStream->param);
|
pStream->callback(pStream->param);
|
||||||
}
|
}
|
||||||
|
taos_close_stream(pStream);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -315,11 +315,11 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
|
||||||
tscTrace("%p stream:%p, stime:%ld is larger than end time: %ld, stop the stream", pStream->pSql, pStream,
|
tscTrace("%p stream:%p, stime:%ld is larger than end time: %ld, stop the stream", pStream->pSql, pStream,
|
||||||
pStream->stime, pStream->etime);
|
pStream->stime, pStream->etime);
|
||||||
// TODO : How to terminate stream here
|
// TODO : How to terminate stream here
|
||||||
taos_close_stream(pStream);
|
|
||||||
if (pStream->callback) {
|
if (pStream->callback) {
|
||||||
// Callback function from upper level
|
// Callback function from upper level
|
||||||
pStream->callback(pStream->param);
|
pStream->callback(pStream->param);
|
||||||
}
|
}
|
||||||
|
taos_close_stream(pStream);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -517,7 +517,8 @@ bool taosGetProcIO(float *readKB, float *writeKB) {
|
||||||
static int64_t lastReadbyte = -1;
|
static int64_t lastReadbyte = -1;
|
||||||
static int64_t lastWritebyte = -1;
|
static int64_t lastWritebyte = -1;
|
||||||
|
|
||||||
int64_t curReadbyte, curWritebyte;
|
int64_t curReadbyte = 0;
|
||||||
|
int64_t curWritebyte = 0;
|
||||||
|
|
||||||
if (!taosReadProcIO(&curReadbyte, &curWritebyte)) {
|
if (!taosReadProcIO(&curReadbyte, &curWritebyte)) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -703,7 +703,7 @@ int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, vo
|
||||||
pHead->msgLen = (int32_t)htonl(msgLen);
|
pHead->msgLen = (int32_t)htonl(msgLen);
|
||||||
code = taosSendUdpData(ip, port, buffer, msgLen, chandle);
|
code = taosSendUdpData(ip, port, buffer, msgLen, chandle);
|
||||||
|
|
||||||
pHead = (STaosHeader *)data;
|
//pHead = (STaosHeader *)data;
|
||||||
|
|
||||||
tinet_ntoa(ipstr, ip);
|
tinet_ntoa(ipstr, ip);
|
||||||
int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp);
|
int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp);
|
||||||
|
|
|
@ -1383,7 +1383,7 @@ int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery) {
|
||||||
|
|
||||||
firstSlot = 0;
|
firstSlot = 0;
|
||||||
lastSlot = pQuery->numOfBlocks - 1;
|
lastSlot = pQuery->numOfBlocks - 1;
|
||||||
numOfBlocks = pQuery->numOfBlocks;
|
//numOfBlocks = pQuery->numOfBlocks;
|
||||||
if (QUERY_IS_ASC_QUERY(pQuery) && pBlock[lastSlot].keyLast < pQuery->skey) continue;
|
if (QUERY_IS_ASC_QUERY(pQuery) && pBlock[lastSlot].keyLast < pQuery->skey) continue;
|
||||||
if (!QUERY_IS_ASC_QUERY(pQuery) && pBlock[firstSlot].keyFirst > pQuery->skey) continue;
|
if (!QUERY_IS_ASC_QUERY(pQuery) && pBlock[firstSlot].keyFirst > pQuery->skey) continue;
|
||||||
|
|
||||||
|
|
|
@ -98,7 +98,7 @@ unsigned char *base64_decode(const char *value, int inlen, int *outlen) {
|
||||||
|
|
||||||
base64_decode_error:
|
base64_decode_error:
|
||||||
free(result);
|
free(result);
|
||||||
*result = 0;
|
result = 0;
|
||||||
*outlen = 0;
|
*outlen = 0;
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -516,20 +516,20 @@ tMemBucket* tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nE
|
||||||
|
|
||||||
if (pDesc->pSchema->numOfCols != 1 || pDesc->pSchema->colOffset[0] != 0) {
|
if (pDesc->pSchema->numOfCols != 1 || pDesc->pSchema->colOffset[0] != 0) {
|
||||||
pError("MemBucket:%p,only consecutive data is allowed,invalid numOfCols:%d or offset:%d",
|
pError("MemBucket:%p,only consecutive data is allowed,invalid numOfCols:%d or offset:%d",
|
||||||
*pBucket, pDesc->pSchema->numOfCols, pDesc->pSchema->colOffset[0]);
|
pBucket, pDesc->pSchema->numOfCols, pDesc->pSchema->colOffset[0]);
|
||||||
tfree(pBucket);
|
tfree(pBucket);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDesc->pSchema->pFields[0].type != dataType) {
|
if (pDesc->pSchema->pFields[0].type != dataType) {
|
||||||
pError("MemBucket:%p,data type is not consistent,%d in schema, %d in param", *pBucket,
|
pError("MemBucket:%p,data type is not consistent,%d in schema, %d in param", pBucket,
|
||||||
pDesc->pSchema->pFields[0].type, dataType);
|
pDesc->pSchema->pFields[0].type, dataType);
|
||||||
tfree(pBucket);
|
tfree(pBucket);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBucket->numOfTotalPages < pBucket->nTotalSlots) {
|
if (pBucket->numOfTotalPages < pBucket->nTotalSlots) {
|
||||||
pWarn("MemBucket:%p,total buffer pages %d are not enough for all slots", *pBucket, pBucket->numOfTotalPages);
|
pWarn("MemBucket:%p,total buffer pages %d are not enough for all slots", pBucket, pBucket->numOfTotalPages);
|
||||||
}
|
}
|
||||||
|
|
||||||
pBucket->pSegs = (tMemBucketSegment *)malloc(pBucket->numOfSegs * sizeof(tMemBucketSegment));
|
pBucket->pSegs = (tMemBucketSegment *)malloc(pBucket->numOfSegs * sizeof(tMemBucketSegment));
|
||||||
|
@ -540,7 +540,7 @@ tMemBucket* tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nE
|
||||||
pBucket->pSegs[i].pBoundingEntries = NULL;
|
pBucket->pSegs[i].pBoundingEntries = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrace("MemBucket:%p,created,buffer size:%d,elem size:%d", *pBucket, pBucket->numOfTotalPages * DEFAULT_PAGE_SIZE,
|
pTrace("MemBucket:%p,created,buffer size:%d,elem size:%d", pBucket, pBucket->numOfTotalPages * DEFAULT_PAGE_SIZE,
|
||||||
pBucket->nElemSize);
|
pBucket->nElemSize);
|
||||||
|
|
||||||
return pBucket;
|
return pBucket;
|
||||||
|
@ -1258,6 +1258,7 @@ static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx,
|
||||||
|
|
||||||
for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) {
|
for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) {
|
||||||
ret = fread(pPage, pMemBuffer->nPageSize, 1, pMemBuffer->dataFile);
|
ret = fread(pPage, pMemBuffer->nPageSize, 1, pMemBuffer->dataFile);
|
||||||
|
UNUSED(ret);
|
||||||
assert(pPage->numOfElems > 0);
|
assert(pPage->numOfElems > 0);
|
||||||
|
|
||||||
tColModelAppend(pDesc->pSchema, buffer, pPage->data, 0, pPage->numOfElems, pPage->numOfElems);
|
tColModelAppend(pDesc->pSchema, buffer, pPage->data, 0, pPage->numOfElems, pPage->numOfElems);
|
||||||
|
@ -1917,6 +1918,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
|
||||||
|
|
||||||
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) {
|
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) {
|
||||||
ret = fread(pPage, pMemBuffer->nPageSize, 1, pMemBuffer->dataFile);
|
ret = fread(pPage, pMemBuffer->nPageSize, 1, pMemBuffer->dataFile);
|
||||||
|
UNUSED(ret);
|
||||||
tMemBucketPut(pMemBucket, pPage->data, pPage->numOfElems);
|
tMemBucketPut(pMemBucket, pPage->data, pPage->numOfElems);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -726,7 +726,7 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) {
|
||||||
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
|
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
double value;
|
double value = 0;
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
ret = convertToDouble(pVariant->pz, pVariant->nLen, &value);
|
ret = convertToDouble(pVariant->pz, pVariant->nLen, &value);
|
||||||
if ((errno == ERANGE && value == -1) || (ret != 0)) {
|
if ((errno == ERANGE && value == -1) || (ret != 0)) {
|
||||||
|
|
Loading…
Reference in New Issue