This commit is contained in:
slguan 2019-08-07 23:16:50 +08:00
parent 8133a76abc
commit 8b05ac9960
16 changed files with 63 additions and 61 deletions

View File

@ -84,8 +84,7 @@ void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int),
pRes->qhandle = 0; pRes->qhandle = 0;
pRes->numOfRows = 1; pRes->numOfRows = 1;
strtolower(sqlstr, pSql->sqlstr); strtolower(pSql->sqlstr, sqlstr);
pSql->sqlstr[sqlLen] = 0;
tscTrace("%p Async SQL: %s, pObj:%p", pSql, pSql->sqlstr, pObj); tscTrace("%p Async SQL: %s, pObj:%p", pSql, pSql->sqlstr, pObj);
int32_t code = tsParseSql(pSql, pObj->acctId, pObj->db, true); int32_t code = tsParseSql(pSql, pObj->acctId, pObj->db, true);

View File

@ -1949,7 +1949,7 @@ int32_t getColumnIndexByName(SSQLToken* pToken, SSchema* pSchema, int32_t numOfC
return -1; return -1;
} }
char* r = strnchr(pToken->z, '.', pToken->n); char* r = strnchr(pToken->z, '.', pToken->n, false);
if (r != NULL) { if (r != NULL) {
r += 1; r += 1;
@ -3172,7 +3172,7 @@ int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t o
bool parsed = false; bool parsed = false;
if (pRight->val.nType == TSDB_DATA_TYPE_BINARY) { if (pRight->val.nType == TSDB_DATA_TYPE_BINARY) {
strdequote(pRight->val.pz); strdequote(pRight->val.pz);
char* seg = strnchr(pRight->val.pz, '-', pRight->val.nLen); char* seg = strnchr(pRight->val.pz, '-', pRight->val.nLen, false);
if (seg != NULL) { if (seg != NULL) {
if (taosParseTime(pRight->val.pz, &val, pRight->val.nLen, TSDB_TIME_PRECISION_MICRO) == TSDB_CODE_SUCCESS) { if (taosParseTime(pRight->val.pz, &val, pRight->val.nLen, TSDB_TIME_PRECISION_MICRO) == TSDB_CODE_SUCCESS) {
parsed = true; parsed = true;

View File

@ -2807,8 +2807,7 @@ int tscGetMeterMetaEx(SSqlObj *pSql, char *meterId, bool createIfNotExists) {
* successfully created the corresponding table. * successfully created the corresponding table.
*/ */
static void tscWaitingForCreateTable(SSqlCmd *pCmd) { static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
int32_t CREATE_METER_ON_DEMAND = 1; if (pCmd->command == TSDB_SQL_INSERT) {
if (pCmd->command == TSDB_SQL_INSERT && pCmd->defaultVal[0] == CREATE_METER_ON_DEMAND) {
taosMsleep(50); // todo: global config taosMsleep(50); // todo: global config
} }
} }

View File

@ -88,7 +88,7 @@ TAOS *taos_connect_imp(char *ip, char *user, char *pass, char *db, int port, voi
strcpy(tmp, db); strcpy(tmp, db);
strdequote(tmp); strdequote(tmp);
strtolower(tmp, pObj->db); strtolower(pObj->db, tmp);
} }
pthread_mutex_init(&pObj->mutex, NULL); pthread_mutex_init(&pObj->mutex, NULL);
@ -198,8 +198,7 @@ int taos_query(TAOS *taos, char *sqlstr) {
return pRes->code; return pRes->code;
} }
strtolower(sqlstr, pSql->sqlstr); strtolower(pSql->sqlstr, sqlstr);
pSql->sqlstr[sqlLen] = 0;
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false);
@ -728,8 +727,7 @@ int taos_validate_sql(TAOS *taos, char *sql) {
return pRes->code; return pRes->code;
} }
strtolower(sql, pSql->sqlstr); strtolower(pSql->sqlstr, sql);
pSql->sqlstr[sqlLen] = 0;
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false);
int code = pRes->code; int code = pRes->code;

View File

@ -898,7 +898,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
char* sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n); char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
if (sep == NULL) { // single part if (sep == NULL) { // single part
if (pToken->type == TK_STRING) { if (pToken->type == TK_STRING) {
pToken->n = strdequote(pToken->z); pToken->n = strdequote(pToken->z);
@ -911,7 +911,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
if (len == pToken->n) { if (len == pToken->n) {
return validateQuoteToken(pToken); return validateQuoteToken(pToken);
} else { } else {
sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n); sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
if (sep == NULL) { if (sep == NULL) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }

View File

@ -129,6 +129,7 @@ extern "C" {
#define TSDB_CODE_INVALID_QUERY_MSG 106 // failed to validate the sql expression msg by vnode #define TSDB_CODE_INVALID_QUERY_MSG 106 // failed to validate the sql expression msg by vnode
#define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered #define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered
#define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered #define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered
#define TSDB_CODE_INVALID_COMMIT_LOG 109 // invalid commit log may be caused by insufficient sotrage
// message type // message type
#define TSDB_MSG_TYPE_REG 1 #define TSDB_MSG_TYPE_REG 1

View File

@ -49,10 +49,10 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
* if it is referenced by other object, it will be remain in cache * if it is referenced by other object, it will be remain in cache
* @param handle cache object * @param handle cache object
* @param data not the key, actually referenced data * @param data not the key, actually referenced data
* @param isForce force model, reduce the ref count and move the data into * @param remove force model, reduce the ref count and move the data into
* pTrash * pTrash
*/ */
void taosRemoveDataFromCache(void *handle, void **data, bool isForce); void taosRemoveDataFromCache(void *handle, void **data, bool remove);
/** /**
* update data in cache * update data in cache

View File

@ -180,12 +180,11 @@ int32_t strdequote(char *src);
void strtrim(char *src); void strtrim(char *src);
char *strnchr(char *haystack, char needle, int32_t len); char *strnchr(char *haystack, char needle, int32_t len, bool skipquote);
char *strnchrNoquote(char *haystack, char needle, int32_t len);
char **strsplit(char *src, const char *delim, int32_t *num); char **strsplit(char *src, const char *delim, int32_t *num);
void strtolower(char *src, char *dst); void strtolower(char *dst, const char *src);
int64_t strnatoi(char *num, int32_t len); int64_t strnatoi(char *num, int32_t len);

View File

@ -1217,6 +1217,7 @@ int taosReSendRspToPeer(SRpcConn *pConn) {
void taosProcessTaosTimer(void *param, void *tmrId) { void taosProcessTaosTimer(void *param, void *tmrId) {
STaosHeader *pHeader = NULL; STaosHeader *pHeader = NULL;
SRpcConn * pConn = (SRpcConn *)param; SRpcConn * pConn = (SRpcConn *)param;
int msgLen;
if (pConn->signature != param) { if (pConn->signature != param) {
tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param);
@ -1252,6 +1253,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
if (pConn->pMsgNode && pConn->pMsgNode->msgLen > 0) { if (pConn->pMsgNode && pConn->pMsgNode->msgLen > 0) {
pHeader = (STaosHeader *)((char *)pConn->pMsgNode + sizeof(SMsgNode)); pHeader = (STaosHeader *)((char *)pConn->pMsgNode + sizeof(SMsgNode));
pHeader->destId = pConn->peerId; pHeader->destId = pConn->peerId;
msgLen = pConn->pMsgNode->msgLen;
if (pConn->spi) { if (pConn->spi) {
STaosDigest *pDigest = (STaosDigest *)(((char *)pHeader) + pConn->pMsgNode->msgLen - sizeof(STaosDigest)); STaosDigest *pDigest = (STaosDigest *)(((char *)pHeader) + pConn->pMsgNode->msgLen - sizeof(STaosDigest));
pDigest->timeStamp = htonl(taosGetTimestampSec()); pDigest->timeStamp = htonl(taosGetTimestampSec());
@ -1279,8 +1281,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
if (pHeader) { if (pHeader) {
(*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, pConn->pMsgNode->msgLen, (*taosSendData[pServer->type])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle);
pConn->chandle);
taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer); taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
} }
} }

View File

@ -139,7 +139,7 @@ char *tsError[] = {"success",
"unexpected response", "unexpected response",
"invalid response type", "invalid response type",
"no resource", "no resource",
"invalid time stamp", // 15 "server-client date time unsynced", // 15
"mismatched meter ID", "mismatched meter ID",
"transcation not finished", "transcation not finished",
"not online", "not online",
@ -232,5 +232,6 @@ char *tsError[] = {"success",
"timestamp out of range", "timestamp out of range",
"invalid query message", "invalid query message",
"timestamp disordered in cache block", "timestamp disordered in cache block",
"timestamp disordered in file block" "timestamp disordered in file block",
"invalid commit log"
}; };

View File

@ -48,9 +48,9 @@ int vnodeOpenCommitLog(int vnode, uint64_t firstV) {
dTrace("vid:%d, logfd:%d, open file:%s success", vnode, pVnode->logFd, fileName); dTrace("vid:%d, logfd:%d, open file:%s success", vnode, pVnode->logFd, fileName);
if (posix_fallocate64(pVnode->logFd, 0, pVnode->mappingSize) != 0) { if (posix_fallocate64(pVnode->logFd, 0, pVnode->mappingSize) != 0) {
dError("vid:%d, logfd:%d, failed to alloc file size:%d", vnode, pVnode->logFd, pVnode->mappingSize); dError("vid:%d, logfd:%d, failed to alloc file size:%d reason:%s", vnode, pVnode->logFd, pVnode->mappingSize, strerror(errno));
perror("fallocate failed"); perror("fallocate failed");
return -1; goto _err_log_open;
} }
struct stat statbuf; struct stat statbuf;
@ -60,13 +60,13 @@ int vnodeOpenCommitLog(int vnode, uint64_t firstV) {
if (length != pVnode->mappingSize) { if (length != pVnode->mappingSize) {
dError("vid:%d, logfd:%d, alloc file size:%ld not equal to mapping size:%ld", vnode, pVnode->logFd, length, dError("vid:%d, logfd:%d, alloc file size:%ld not equal to mapping size:%ld", vnode, pVnode->logFd, length,
pVnode->mappingSize); pVnode->mappingSize);
return -1; goto _err_log_open;
} }
pVnode->pMem = mmap(0, pVnode->mappingSize, PROT_WRITE | PROT_READ, MAP_SHARED, pVnode->logFd, 0); pVnode->pMem = mmap(0, pVnode->mappingSize, PROT_WRITE | PROT_READ, MAP_SHARED, pVnode->logFd, 0);
if (pVnode->pMem == MAP_FAILED) { if (pVnode->pMem == MAP_FAILED) {
dError("vid:%d, logfd:%d, failed to map file, reason:%s", vnode, pVnode->logFd, strerror(errno)); dError("vid:%d, logfd:%d, failed to map file, reason:%s", vnode, pVnode->logFd, strerror(errno));
return -1; goto _err_log_open;
} }
pVnode->pWrite = pVnode->pMem; pVnode->pWrite = pVnode->pMem;
@ -74,6 +74,12 @@ int vnodeOpenCommitLog(int vnode, uint64_t firstV) {
pVnode->pWrite += sizeof(firstV); pVnode->pWrite += sizeof(firstV);
return pVnode->logFd; return pVnode->logFd;
_err_log_open:
close(pVnode->logFd);
remove(fileName);
pVnode->logFd = -1;
return -1;
} }
int vnodeRenewCommitLog(int vnode) { int vnodeRenewCommitLog(int vnode) {
@ -244,9 +250,9 @@ int vnodeInitCommit(int vnode) {
void vnodeCleanUpCommit(int vnode) { void vnodeCleanUpCommit(int vnode) {
SVnodeObj *pVnode = vnodeList + vnode; SVnodeObj *pVnode = vnodeList + vnode;
if (pVnode->logFd) tclose(pVnode->logFd); if (VALIDFD(pVnode->logFd)) tclose(pVnode->logFd);
if (pVnode->cfg.commitLog && remove(pVnode->logFn) < 0) { if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) {
dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn); dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn);
taosLogError("vid:%d, failed to remove:%s", vnode, pVnode->logFn); taosLogError("vid:%d, failed to remove:%s", vnode, pVnode->logFn);
} }

View File

@ -881,6 +881,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} }
if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
if (code != 0) return code; if (code != 0) return code;
} }

View File

@ -556,6 +556,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
// FIXME: Here should be after the comparison of sversions. // FIXME: Here should be after the comparison of sversions.
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_INSERT, cont, contLen, sversion); code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_INSERT, cont, contLen, sversion);
if (code != 0) return code; if (code != 0) return code;
} }

View File

@ -46,7 +46,8 @@ typedef struct _cache_node_t {
char * key; /* null-terminated string */ char * key; /* null-terminated string */
struct _cache_node_t *prev; struct _cache_node_t *prev;
struct _cache_node_t *next; struct _cache_node_t *next;
uint64_t time; uint64_t addTime; // the time when this element is added or updated into cache
uint64_t time; // end time when this element should be remove from cache
uint64_t signature; uint64_t signature;
/* /*
@ -78,7 +79,7 @@ typedef struct {
* when the node in pTrash does not be referenced, it will be release at the expired time * when the node in pTrash does not be referenced, it will be release at the expired time
*/ */
SDataNode *pTrash; SDataNode *pTrash;
int numOfElemsInTrash; /* number of element in trash */ int numOfElemsInTrash; // number of element in trash
void *tmrCtrl; void *tmrCtrl;
void *pTimer; void *pTimer;
@ -87,7 +88,7 @@ typedef struct {
_hashFunc hashFp; _hashFunc hashFp;
/* /*
* pthread_rwlock_t have bugs on the windows platform * pthread_rwlock_t will block ops on the windows platform, when refresh is called.
* so use pthread_mutex_t as an alternative * so use pthread_mutex_t as an alternative
*/ */
#if defined LINUX #if defined LINUX
@ -125,7 +126,8 @@ static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const cha
memcpy(pNewNode->data, pData, dataSize); memcpy(pNewNode->data, pData, dataSize);
pNewNode->time = taosGetTimestampMs() + lifespan; pNewNode->addTime = (uint64_t) taosGetTimestampMs();
pNewNode->time = pNewNode->addTime + lifespan;
pNewNode->key = pNewNode->data + dataSize; pNewNode->key = pNewNode->data + dataSize;
strcpy(pNewNode->key, key); strcpy(pNewNode->key, key);
@ -146,7 +148,7 @@ static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const cha
static FORCE_INLINE int taosHashKey(int maxSessions, char *key, uint32_t len) { static FORCE_INLINE int taosHashKey(int maxSessions, char *key, uint32_t len) {
uint32_t hash = MurmurHash3_32(key, len); uint32_t hash = MurmurHash3_32(key, len);
/* avoid the costly remainder operation */ // avoid the costly remainder operation
assert((maxSessions & (maxSessions - 1)) == 0); assert((maxSessions & (maxSessions - 1)) == 0);
hash = hash & (maxSessions - 1); hash = hash & (maxSessions - 1);
@ -485,11 +487,12 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
if (pOldNode == NULL) { // do add to cache if (pOldNode == NULL) { // do add to cache
pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, keepTime * 1000L); pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, keepTime * 1000L);
pTrace("key:%s %p added into cache,slot:%d,expireTime:%lld,cache total:%d,size:%ldbytes,collision:%d", pNode->key, pTrace("key:%s %p added into cache, slot:%d, addTime:%lld, expireTime:%lld, cache total:%d, "
pNode, pNode->hashVal, pNode->time, pObj->total, pObj->totalSize, pObj->statistics.numOfCollision); "size:%lldbytes, collision:%d", pNode->key, pNode, pNode->hashVal, pNode->addTime, pNode->time, pObj->total,
pObj->totalSize, pObj->statistics.numOfCollision);
} else { // old data exists, update the node } else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L); pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L);
// pWarn("key:%s %p exist in cache,updated", key, pNode); pTrace("key:%s %p exist in cache, updated", key, pNode);
} }
#if defined LINUX #if defined LINUX
@ -507,7 +510,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
* @param handle * @param handle
* @param data * @param data
*/ */
void taosRemoveDataFromCache(void *handle, void **data, _Bool isForce) { void taosRemoveDataFromCache(void *handle, void **data, bool remove) {
SCacheObj *pObj = (SCacheObj *)handle; SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->maxSessions == 0 || (*data) == NULL || (pObj->total + pObj->numOfElemsInTrash == 0)) return; if (pObj == NULL || pObj->maxSessions == 0 || (*data) == NULL || (pObj->total + pObj->numOfElemsInTrash == 0)) return;
@ -532,7 +535,7 @@ void taosRemoveDataFromCache(void *handle, void **data, _Bool isForce) {
*data = NULL; *data = NULL;
if (isForce) { if (remove) {
#if defined LINUX #if defined LINUX
pthread_rwlock_wrlock(&pObj->lock); pthread_rwlock_wrlock(&pObj->lock);
#else #else
@ -540,6 +543,7 @@ void taosRemoveDataFromCache(void *handle, void **data, _Bool isForce) {
#endif #endif
taosCacheMoveNodeToTrash(pObj, pNode); taosCacheMoveNodeToTrash(pObj, pNode);
#if defined LINUX #if defined LINUX
pthread_rwlock_unlock(&pObj->lock); pthread_rwlock_unlock(&pObj->lock);
#else #else

View File

@ -56,7 +56,7 @@ int64_t taosGetTimestamp(int32_t precision) {
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec) { int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec) {
/* parse datatime string in with tz */ /* parse datatime string in with tz */
if (strnchr(timestr, 'T', len) != NULL) { if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, time, timePrec); return parseTimeWithTz(timestr, time, timePrec);
} else { } else {
return parseLocaltime(timestr, time, timePrec); return parseLocaltime(timestr, time, timePrec);

View File

@ -119,8 +119,18 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
return split; return split;
} }
char *strnchr(char *haystack, char needle, int32_t len) { char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
for (int32_t i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
// skip the needle in quote, jump to the end of quoted string
if (skipquote && (haystack[i] == '\'' || haystack[i] == '"')) {
char quote = haystack[i++];
while(i < len && haystack[i++] != quote);
if (i >= len) {
return NULL;
}
}
if (haystack[i] == needle) { if (haystack[i] == needle) {
return &haystack[i]; return &haystack[i];
} }
@ -129,27 +139,7 @@ char *strnchr(char *haystack, char needle, int32_t len) {
return NULL; return NULL;
} }
char *strnchrNoquote(char *haystack, char needle, int32_t len) { void strtolower(char *dst, const char *z) {
for (int32_t i = 0; i < len; ++i) {
if (haystack[i] == '\'' || haystack[i] == '"') {
char quote = haystack[i++];
while(i < len && haystack[i] != quote){++i;}
if (++i >= len) {
return NULL;
}
}
if (haystack[i] == needle) {
return &haystack[i];
}
}
return NULL;
}
void strtolower(char *z, char *dst) {
int quote = 0; int quote = 0;
char *str = z; char *str = z;
if (dst == NULL) { if (dst == NULL) {
@ -169,6 +159,8 @@ void strtolower(char *z, char *dst) {
str++; str++;
} }
*dst = 0;
} }
char *paGetToken(char *string, char **token, int32_t *tokenLen) { char *paGetToken(char *string, char **token, int32_t *tokenLen) {