Merge branch 'develop' into feature/2.0tsdb

This commit is contained in:
Hongze Cheng 2020-06-19 03:05:51 +00:00
commit e58f758b58
16 changed files with 98 additions and 58 deletions

View File

@ -294,11 +294,12 @@ typedef struct STscObj {
} STscObj; } STscObj;
typedef struct SSqlObj { typedef struct SSqlObj {
void * signature; void *signature;
STscObj *pTscObj; STscObj *pTscObj;
void *SRpcReqContext;
void (*fp)(); void (*fp)();
void (*fetchFp)(); void (*fetchFp)();
void * param; void *param;
int64_t stime; int64_t stime;
uint32_t queryId; uint32_t queryId;
void * pStream; void * pStream;

View File

@ -196,8 +196,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
pSql->SRpcReqContext = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -422,7 +422,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state. * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
*/ */
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
// taosStopRpcConn(pSql->pSubs[i]->); rpcCancelRequest(pSql->pSubs[i]->SRpcReqContext);
} }
/* /*

View File

@ -627,7 +627,7 @@ void taos_stop_query(TAOS_RES *res) {
return; return;
} }
//taosStopRpcConn(pSql->thandle); rpcCancelRequest(pSql->SRpcReqContext);
tscTrace("%p query is cancelled", res); tscTrace("%p query is cancelled", res);
} }

View File

@ -84,7 +84,7 @@ void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCanelRequest(void *pContext); void rpcCancelRequest(void *pContext);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -52,7 +52,7 @@ bool gcGetUserFromUrl(HttpContext* pContext) {
return false; return false;
} }
strcpy(pContext->user, pParser->path[GC_USER_URL_POS].pos); tstrncpy(pContext->user, pParser->path[GC_USER_URL_POS].pos, TSDB_USER_LEN);
return true; return true;
} }
@ -62,7 +62,7 @@ bool gcGetPassFromUrl(HttpContext* pContext) {
return false; return false;
} }
strcpy(pContext->pass, pParser->path[GC_PASS_URL_POS].pos); tstrncpy(pContext->pass, pParser->path[GC_PASS_URL_POS].pos, TSDB_PASSWORD_LEN);
return true; return true;
} }

View File

@ -29,6 +29,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) {
char *base64 = (char *)base64_decode(token, len, &outlen); char *base64 = (char *)base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) { if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, ip:%s, basic token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token); httpError("context:%p, fd:%d, ip:%s, basic token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token);
free(base64);
return false; return false;
} }

View File

@ -442,13 +442,12 @@ void httpJsonPairStatus(JsonBuf* buf, int code) {
httpJsonPair(buf, "status", 6, "error", 5); httpJsonPair(buf, "status", 6, "error", 5);
httpJsonItemToken(buf); httpJsonItemToken(buf);
httpJsonPairIntVal(buf, "code", 4, code); httpJsonPairIntVal(buf, "code", 4, code);
if (code >= 0) {
httpJsonItemToken(buf); httpJsonItemToken(buf);
if (code == TSDB_CODE_MND_DB_NOT_SELECTED) { if (code == TSDB_CODE_MND_DB_NOT_SELECTED) {
httpJsonPair(buf, "desc", 4, "failed to create database", 23); httpJsonPair(buf, "desc", 4, "failed to create database", 23);
} else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) { } else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
httpJsonPair(buf, "desc", 4, "failed to create table", 22); httpJsonPair(buf, "desc", 4, "failed to create table", 22);
} else } else {
httpJsonPair(buf, "desc", 4, (char*)tstrerror(code), (int)strlen(tstrerror(code))); httpJsonPair(buf, "desc", 4, (char*)tstrerror(code), (int)strlen(tstrerror(code)));
} }
} }

View File

@ -202,8 +202,7 @@ bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize) {
pContext->user, cmdSize); pContext->user, cmdSize);
return false; return false;
} }
memset(multiCmds->cmds + multiCmds->maxSize * (int16_t)sizeof(HttpSqlCmd), 0, memset(multiCmds->cmds + multiCmds->maxSize, 0, (size_t)(cmdSize - multiCmds->maxSize) * sizeof(HttpSqlCmd));
(size_t)(cmdSize - multiCmds->maxSize) * sizeof(HttpSqlCmd));
multiCmds->maxSize = (int16_t)cmdSize; multiCmds->maxSize = (int16_t)cmdSize;
return true; return true;

View File

@ -65,7 +65,7 @@ bool restGetUserFromUrl(HttpContext* pContext) {
return false; return false;
} }
strcpy(pContext->user, pParser->path[REST_USER_URL_POS].pos); tstrncpy(pContext->user, pParser->path[REST_USER_URL_POS].pos, TSDB_USER_LEN);
return true; return true;
} }
@ -75,7 +75,7 @@ bool restGetPassFromUrl(HttpContext* pContext) {
return false; return false;
} }
strcpy(pContext->pass, pParser->path[REST_PASS_URL_POS].pos); tstrncpy(pContext->pass, pParser->path[REST_PASS_URL_POS].pos, TSDB_PASSWORD_LEN);
return true; return true;
} }

View File

@ -268,10 +268,10 @@ int tgReadSchema(char *fileName) {
httpPrint("open telegraf schema file:%s success", fileName); httpPrint("open telegraf schema file:%s success", fileName);
fseek(fp, 0, SEEK_END); fseek(fp, 0, SEEK_END);
size_t contentSize = (size_t)ftell(fp); int32_t contentSize = (int32_t)ftell(fp);
rewind(fp); rewind(fp);
char *content = (char *)calloc(contentSize * sizeof(char) + 1, 1); char * content = (char *)calloc(contentSize + 1, 1);
size_t result = fread(content, 1, contentSize, fp); int32_t result = fread(content, 1, contentSize, fp);
if (result != contentSize) { if (result != contentSize) {
httpError("failed to read telegraf schema file:%s", fileName); httpError("failed to read telegraf schema file:%s", fileName);
fclose(fp); fclose(fp);
@ -279,6 +279,7 @@ int tgReadSchema(char *fileName) {
return -1; return -1;
} }
content[contentSize] = 0;
int schemaNum = tgParseSchema(content, fileName); int schemaNum = tgParseSchema(content, fileName);
free(content); free(content);

View File

@ -61,7 +61,7 @@ typedef struct {
char ep[TSDB_EP_LEN]; char ep[TSDB_EP_LEN];
int8_t cmdIndex; int8_t cmdIndex;
int8_t state; int8_t state;
char sql[SQL_LENGTH]; char sql[SQL_LENGTH + 1];
void * initTimer; void * initTimer;
void * diskTimer; void * diskTimer;
} SMonitorConn; } SMonitorConn;

View File

@ -164,7 +164,7 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
void* mqttClientRefresher(void* client) { void* mqttClientRefresher(void* client) {
while (mttIsRuning) { while (mttIsRuning) {
mqtt_sync((struct mqtt_client*)client); mqtt_sync((struct mqtt_client*)client);
usleep(100000U); taosMsleep(100);
} }
mqttPrint("Exit mqttClientRefresher"); mqttPrint("Exit mqttClientRefresher");
return NULL; return NULL;

View File

@ -798,8 +798,16 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block //data in buffer has greater timestamp, copy data in file block
for (int32_t i = 0; i < requiredNumOfCols; ++i) { int32_t i = 0, j = 0;
while(i < requiredNumOfCols && j < pCols->numOfCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
SDataCol* src = &pCols->cols[j];
if (src->colId < pColInfo->info.colId) {
j++;
continue;
}
int32_t bytes = pColInfo->info.bytes; int32_t bytes = pColInfo->info.bytes;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
@ -808,9 +816,6 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
} }
for (int32_t j = 0; j < pCols->numOfCols; ++j) { // todo opt performance
SDataCol* src = &pCols->cols[j];
if (pColInfo->info.colId == src->colId) { if (pColInfo->info.colId == src->colId) {
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
@ -826,9 +831,43 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
} }
} }
break; j++;
i++;
} else { // pColInfo->info.colId < src->colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
char* dst = pData;
for(int32_t k = start; k < num + start; ++k) {
setVardataNull(dst, pColInfo->info.type);
dst += bytes;
}
} else {
setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
}
i++;
} }
} }
while (i < requiredNumOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
char* dst = pData;
for(int32_t k = start; k < num + start; ++k) {
setVardataNull(dst, pColInfo->info.type);
dst += pColInfo->info.bytes;
}
} else {
setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
}
i++;
} }
pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.win.ekey = tsArray[end];

View File

@ -117,7 +117,7 @@ extern "C" {
#define POW2(x) ((x) * (x)) #define POW2(x) ((x) * (x))
int taosRand(void); uint32_t taosRand(void);
int32_t strdequote(char *src); int32_t strdequote(char *src);

View File

@ -29,12 +29,12 @@
#ifdef WINDOWS #ifdef WINDOWS
int taosRand(void) uint32_t taosRand(void)
{ {
return rand(); return rand();
} }
#else #else
int taosRand(void) uint32_t taosRand(void)
{ {
int fd; int fd;
int seed; int seed;
@ -50,7 +50,7 @@ int taosRand(void)
close(fd); close(fd);
} }
return seed; return (uint32_t)seed;
} }
#endif #endif
@ -475,8 +475,8 @@ void taosRandStr(char* str, int32_t size) {
const char* set = "abcdefghijklmnopqrstuvwxyz0123456789-_."; const char* set = "abcdefghijklmnopqrstuvwxyz0123456789-_.";
int32_t len = 39; int32_t len = 39;
for(int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
str[i] = set[taosRand()%len]; str[i] = set[taosRand() % len];
} }
} }

View File

@ -118,7 +118,7 @@
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>2.9.10.4</version> <version>2.10.0.pr1</version>
</dependency> </dependency>
<dependency> <dependency>