diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e538281bc0..bd956aeb74 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -294,11 +294,12 @@ typedef struct STscObj { } STscObj; typedef struct SSqlObj { - void * signature; - STscObj *pTscObj; - void (*fp)(); - void (*fetchFp)(); - void * param; + void *signature; + STscObj *pTscObj; + void *SRpcReqContext; + void (*fp)(); + void (*fetchFp)(); + void *param; int64_t stime; uint32_t queryId; void * pStream; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b509dd3364..e15a5e618a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -196,8 +196,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = pSql, .code = 0 }; - rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); + pSql->SRpcReqContext = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -422,7 +422,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { * sub-queries not correctly released and master sql object of super table query reaches an abnormal state. */ pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; -// taosStopRpcConn(pSql->pSubs[i]->); + rpcCancelRequest(pSql->pSubs[i]->SRpcReqContext); } /* diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index b13e7b7ccf..0be7db435d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -627,7 +627,7 @@ void taos_stop_query(TAOS_RES *res) { return; } - //taosStopRpcConn(pSql->thandle); + rpcCancelRequest(pSql->SRpcReqContext); tscTrace("%p query is cancelled", res); } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 6c5d7fa889..30e0f9eee1 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -84,7 +84,7 @@ void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCanelRequest(void *pContext); +void rpcCancelRequest(void *pContext); #ifdef __cplusplus } diff --git a/src/plugins/http/src/gcHandle.c b/src/plugins/http/src/gcHandle.c index 176e16301b..20cd19df9d 100644 --- a/src/plugins/http/src/gcHandle.c +++ b/src/plugins/http/src/gcHandle.c @@ -52,7 +52,7 @@ bool gcGetUserFromUrl(HttpContext* pContext) { return false; } - strcpy(pContext->user, pParser->path[GC_USER_URL_POS].pos); + tstrncpy(pContext->user, pParser->path[GC_USER_URL_POS].pos, TSDB_USER_LEN); return true; } @@ -62,7 +62,7 @@ bool gcGetPassFromUrl(HttpContext* pContext) { return false; } - strcpy(pContext->pass, pParser->path[GC_PASS_URL_POS].pos); + tstrncpy(pContext->pass, pParser->path[GC_PASS_URL_POS].pos, TSDB_PASSWORD_LEN); return true; } diff --git a/src/plugins/http/src/httpAuth.c b/src/plugins/http/src/httpAuth.c index 0439083f31..69630336a2 100644 --- a/src/plugins/http/src/httpAuth.c +++ b/src/plugins/http/src/httpAuth.c @@ -29,6 +29,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { char *base64 = (char *)base64_decode(token, len, &outlen); if (base64 == NULL || outlen == 0) { httpError("context:%p, fd:%d, ip:%s, basic token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token); + free(base64); return false; } diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index 901b930774..7b49b06571 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -442,14 +442,13 @@ void httpJsonPairStatus(JsonBuf* buf, int code) { httpJsonPair(buf, "status", 6, "error", 5); httpJsonItemToken(buf); httpJsonPairIntVal(buf, "code", 4, code); - if (code >= 0) { - httpJsonItemToken(buf); - if (code == TSDB_CODE_MND_DB_NOT_SELECTED) { - httpJsonPair(buf, "desc", 4, "failed to create database", 23); - } else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { - httpJsonPair(buf, "desc", 4, "failed to create table", 22); - } else - httpJsonPair(buf, "desc", 4, (char*)tstrerror(code), (int)strlen(tstrerror(code))); + httpJsonItemToken(buf); + if (code == TSDB_CODE_MND_DB_NOT_SELECTED) { + httpJsonPair(buf, "desc", 4, "failed to create database", 23); + } else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { + httpJsonPair(buf, "desc", 4, "failed to create table", 22); + } else { + httpJsonPair(buf, "desc", 4, (char*)tstrerror(code), (int)strlen(tstrerror(code))); } } } diff --git a/src/plugins/http/src/httpUtil.c b/src/plugins/http/src/httpUtil.c index 694cdec0a0..d1a0eb90f0 100644 --- a/src/plugins/http/src/httpUtil.c +++ b/src/plugins/http/src/httpUtil.c @@ -202,8 +202,7 @@ bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize) { pContext->user, cmdSize); return false; } - memset(multiCmds->cmds + multiCmds->maxSize * (int16_t)sizeof(HttpSqlCmd), 0, - (size_t)(cmdSize - multiCmds->maxSize) * sizeof(HttpSqlCmd)); + memset(multiCmds->cmds + multiCmds->maxSize, 0, (size_t)(cmdSize - multiCmds->maxSize) * sizeof(HttpSqlCmd)); multiCmds->maxSize = (int16_t)cmdSize; return true; diff --git a/src/plugins/http/src/restHandle.c b/src/plugins/http/src/restHandle.c index 93094fa287..2e04f562ea 100644 --- a/src/plugins/http/src/restHandle.c +++ b/src/plugins/http/src/restHandle.c @@ -65,7 +65,7 @@ bool restGetUserFromUrl(HttpContext* pContext) { return false; } - strcpy(pContext->user, pParser->path[REST_USER_URL_POS].pos); + tstrncpy(pContext->user, pParser->path[REST_USER_URL_POS].pos, TSDB_USER_LEN); return true; } @@ -75,7 +75,7 @@ bool restGetPassFromUrl(HttpContext* pContext) { return false; } - strcpy(pContext->pass, pParser->path[REST_PASS_URL_POS].pos); + tstrncpy(pContext->pass, pParser->path[REST_PASS_URL_POS].pos, TSDB_PASSWORD_LEN); return true; } diff --git a/src/plugins/http/src/tgHandle.c b/src/plugins/http/src/tgHandle.c index fae11127e1..354fa6b07e 100644 --- a/src/plugins/http/src/tgHandle.c +++ b/src/plugins/http/src/tgHandle.c @@ -268,10 +268,10 @@ int tgReadSchema(char *fileName) { httpPrint("open telegraf schema file:%s success", fileName); fseek(fp, 0, SEEK_END); - size_t contentSize = (size_t)ftell(fp); + int32_t contentSize = (int32_t)ftell(fp); rewind(fp); - char *content = (char *)calloc(contentSize * sizeof(char) + 1, 1); - size_t result = fread(content, 1, contentSize, fp); + char * content = (char *)calloc(contentSize + 1, 1); + int32_t result = fread(content, 1, contentSize, fp); if (result != contentSize) { httpError("failed to read telegraf schema file:%s", fileName); fclose(fp); @@ -279,6 +279,7 @@ int tgReadSchema(char *fileName) { return -1; } + content[contentSize] = 0; int schemaNum = tgParseSchema(content, fileName); free(content); diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index ea7916026c..feb03df35d 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -61,7 +61,7 @@ typedef struct { char ep[TSDB_EP_LEN]; int8_t cmdIndex; int8_t state; - char sql[SQL_LENGTH]; + char sql[SQL_LENGTH + 1]; void * initTimer; void * diskTimer; } SMonitorConn; diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 3266579e33..e9af733d07 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -164,7 +164,7 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published void* mqttClientRefresher(void* client) { while (mttIsRuning) { mqtt_sync((struct mqtt_client*)client); - usleep(100000U); + taosMsleep(100); } mqttPrint("Exit mqttClientRefresher"); return NULL; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 9cd2ada6ac..a2dd23dcc1 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -798,38 +798,77 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); //data in buffer has greater timestamp, copy data in file block - for (int32_t i = 0; i < requiredNumOfCols; ++i) { + int32_t i = 0, j = 0; + while(i < requiredNumOfCols && j < pCols->numOfCols) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - int32_t bytes = pColInfo->info.bytes; - + + SDataCol* src = &pCols->cols[j]; + if (src->colId < pColInfo->info.colId) { + j++; + continue; + } + + int32_t bytes = pColInfo->info.bytes; + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; } else { pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; } - - for (int32_t j = 0; j < pCols->numOfCols; ++j) { // todo opt performance - SDataCol* src = &pCols->cols[j]; - - if (pColInfo->info.colId == src->colId) { - - if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { - memmove(pData, src->pData + bytes * start, bytes * num); - } else { // handle the var-string - char* dst = pData; - - // todo refactor, only copy one-by-one - for (int32_t k = start; k < num + start; ++k) { - char* p = tdGetColDataOfRow(src, k); - memcpy(dst, p, varDataTLen(p)); - dst += bytes; - } + + if (pColInfo->info.colId == src->colId) { + + if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { + memmove(pData, src->pData + bytes * start, bytes * num); + } else { // handle the var-string + char* dst = pData; + + // todo refactor, only copy one-by-one + for (int32_t k = start; k < num + start; ++k) { + char* p = tdGetColDataOfRow(src, k); + memcpy(dst, p, varDataTLen(p)); + dst += bytes; } - - break; } + + j++; + i++; + } else { // pColInfo->info.colId < src->colId, it is a NULL data + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + char* dst = pData; + + for(int32_t k = start; k < num + start; ++k) { + setVardataNull(dst, pColInfo->info.type); + dst += bytes; + } + } else { + setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num); + } + i++; } } + + while (i < requiredNumOfCols) { // the remain columns are all null data + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; + } + + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + char* dst = pData; + + for(int32_t k = start; k < num + start; ++k) { + setVardataNull(dst, pColInfo->info.type); + dst += pColInfo->info.bytes; + } + } else { + setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num); + } + + i++; + } pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.lastKey = tsArray[end] + step; diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index bb74bafbdd..e3e52d46ff 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -117,7 +117,7 @@ extern "C" { #define POW2(x) ((x) * (x)) -int taosRand(void); +uint32_t taosRand(void); int32_t strdequote(char *src); diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index a2edce2387..4469ad79b1 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -29,12 +29,12 @@ #ifdef WINDOWS -int taosRand(void) +uint32_t taosRand(void) { return rand(); } #else -int taosRand(void) +uint32_t taosRand(void) { int fd; int seed; @@ -50,7 +50,7 @@ int taosRand(void) close(fd); } - return seed; + return (uint32_t)seed; } #endif @@ -474,9 +474,9 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { void taosRandStr(char* str, int32_t size) { const char* set = "abcdefghijklmnopqrstuvwxyz0123456789-_."; int32_t len = 39; - - for(int32_t i = 0; i < size; ++i) { - str[i] = set[taosRand()%len]; + + for (int32_t i = 0; i < size; ++i) { + str[i] = set[taosRand() % len]; } } diff --git a/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml b/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml index 6c37746d24..f6728359e5 100644 --- a/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml +++ b/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml @@ -118,7 +118,7 @@ com.fasterxml.jackson.core jackson-databind - 2.9.10.4 + 2.10.0.pr1