This commit is contained in:
chenhaoran 2024-02-29 14:51:14 +08:00
commit 8cb6ead77c
16 changed files with 493 additions and 98 deletions

View File

@ -72,40 +72,6 @@ struct STaosQnode {
char item[]; char item[];
}; };
struct STaosQueue {
STaosQnode *head;
STaosQnode *tail;
STaosQueue *next; // for queue set
STaosQset *qset; // for queue set
void *ahandle; // for queue set
FItem itemFp;
FItems itemsFp;
TdThreadMutex mutex;
int64_t memOfItems;
int32_t numOfItems;
int64_t threadId;
int64_t memLimit;
int64_t itemLimit;
};
struct STaosQset {
STaosQueue *head;
STaosQueue *current;
TdThreadMutex mutex;
tsem_t sem;
int32_t numOfQueues;
int32_t numOfItems;
};
struct STaosQall {
STaosQnode *current;
STaosQnode *start;
int32_t numOfItems;
int64_t memOfItems;
int32_t unAccessedNumOfItems;
int64_t unAccessMemOfItems;
};
STaosQueue *taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue); void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
@ -140,6 +106,8 @@ int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo);
void taosResetQsetThread(STaosQset *qset, void *pItem); void taosResetQsetThread(STaosQset *qset, void *pItem);
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId);
int64_t taosQueueGetThreadId(STaosQueue *pQueue);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -80,7 +80,7 @@ extern "C" {
#define IS_SAME_KEY (maxKV->type == kv->type && maxKV->keyLen == kv->keyLen && memcmp(maxKV->key, kv->key, kv->keyLen) == 0) #define IS_SAME_KEY (maxKV->type == kv->type && maxKV->keyLen == kv->keyLen && memcmp(maxKV->key, kv->key, kv->keyLen) == 0)
#define IS_SLASH_LETTER_IN_MEASUREMENT(sql) \ #define IS_SLASH_LETTER_IN_MEASUREMENT(sql) \
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE)) (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == SLASH))
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len)) #define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))

View File

@ -20,14 +20,14 @@
#include "clientSml.h" #include "clientSml.h"
#define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH) #define IS_COMMA(sql,escapeChar) (*(sql) == COMMA && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar)))
#define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH) #define IS_SPACE(sql,escapeChar) (*(sql) == SPACE && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar)))
#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH) #define IS_EQUAL(sql,escapeChar) (*(sql) == EQUAL && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar)))
#define IS_SLASH_LETTER_IN_FIELD_VALUE(sql) (*((sql)-1) == SLASH && (*(sql) == QUOTE || *(sql) == SLASH)) #define IS_SLASH_LETTER_IN_FIELD_VALUE(sql) (*((sql)-1) == SLASH && (*(sql) == QUOTE || *(sql) == SLASH))
#define IS_SLASH_LETTER_IN_TAG_FIELD_KEY(sql) \ #define IS_SLASH_LETTER_IN_TAG_FIELD_KEY(sql) \
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL)) (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == SLASH))
#define PROCESS_SLASH_IN_FIELD_VALUE(key, keyLen) \ #define PROCESS_SLASH_IN_FIELD_VALUE(key, keyLen) \
for (int i = 1; i < keyLen; ++i) { \ for (int i = 1; i < keyLen; ++i) { \
@ -198,7 +198,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
int cnt = 0; int cnt = 0;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql))) { if (unlikely(IS_SPACE(*sql,NULL))) {
break; break;
} }
@ -207,18 +207,21 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
size_t keyLen = 0; size_t keyLen = 0;
bool keyEscaped = false; bool keyEscaped = false;
size_t keyLenEscaped = 0; size_t keyLenEscaped = 0;
const char *escapeChar = NULL;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) { if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
terrno = TSDB_CODE_SML_INVALID_DATA; terrno = TSDB_CODE_SML_INVALID_DATA;
return -1; return -1;
} }
if (unlikely(IS_EQUAL(*sql))) { if (unlikely(IS_EQUAL(*sql,escapeChar))) {
keyLen = *sql - key; keyLen = *sql - key;
(*sql)++; (*sql)++;
break; break;
} }
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) { if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
escapeChar = *sql;
keyLenEscaped++; keyLenEscaped++;
keyEscaped = true; keyEscaped = true;
} }
@ -238,15 +241,16 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
size_t valueLenEscaped = 0; size_t valueLenEscaped = 0;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
// parse value // parse value
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) { if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
break; break;
} else if (unlikely(IS_EQUAL(*sql))) { } else if (unlikely(IS_EQUAL(*sql,escapeChar))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
terrno = TSDB_CODE_SML_INVALID_DATA; terrno = TSDB_CODE_SML_INVALID_DATA;
return -1; return -1;
} }
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) { if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
escapeChar = *sql;
valueLenEscaped++; valueLenEscaped++;
valueEscaped = true; valueEscaped = true;
} }
@ -293,7 +297,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
} }
cnt++; cnt++;
if (IS_SPACE(*sql)) { if (IS_SPACE(*sql,escapeChar)) {
break; break;
} }
(*sql)++; (*sql)++;
@ -326,7 +330,7 @@ static int32_t smlParseTagLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) { static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) {
int cnt = 0; int cnt = 0;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql))) { if (unlikely(IS_SPACE(*sql,NULL))) {
break; break;
} }
@ -335,17 +339,19 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
size_t keyLen = 0; size_t keyLen = 0;
bool keyEscaped = false; bool keyEscaped = false;
size_t keyLenEscaped = 0; size_t keyLenEscaped = 0;
const char *escapeChar = NULL;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) { if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
if (unlikely(IS_EQUAL(*sql))) { if (unlikely(IS_EQUAL(*sql,escapeChar))) {
keyLen = *sql - key; keyLen = *sql - key;
(*sql)++; (*sql)++;
break; break;
} }
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) { if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
escapeChar = *sql;
keyLenEscaped++; keyLenEscaped++;
keyEscaped = true; keyEscaped = true;
} }
@ -363,7 +369,6 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
bool valueEscaped = false; bool valueEscaped = false;
size_t valueLenEscaped = 0; size_t valueLenEscaped = 0;
int quoteNum = 0; int quoteNum = 0;
const char *escapeChar = NULL;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
// parse value // parse value
if (unlikely(*(*sql) == QUOTE && (*(*sql - 1) != SLASH || (*sql - 1) == escapeChar))) { if (unlikely(*(*sql) == QUOTE && (*(*sql - 1) != SLASH || (*sql - 1) == escapeChar))) {
@ -374,7 +379,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
} }
continue; continue;
} }
if (quoteNum % 2 == 0 && (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql)))) { if (quoteNum % 2 == 0 && (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar)))) {
break; break;
} }
if (IS_SLASH_LETTER_IN_FIELD_VALUE(*sql) && (*sql - 1) != escapeChar) { if (IS_SLASH_LETTER_IN_FIELD_VALUE(*sql) && (*sql - 1) != escapeChar) {
@ -437,7 +442,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
} }
cnt++; cnt++;
if (IS_SPACE(*sql)) { if (IS_SPACE(*sql,escapeChar)) {
break; break;
} }
(*sql)++; (*sql)++;
@ -453,19 +458,18 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
elements->measure = sql; elements->measure = sql;
// parse measure // parse measure
size_t measureLenEscaped = 0; size_t measureLenEscaped = 0;
const char *escapeChar = NULL;
while (sql < sqlEnd) { while (sql < sqlEnd) {
if (unlikely((sql != elements->measure) && IS_SLASH_LETTER_IN_MEASUREMENT(sql))) { if (unlikely(IS_COMMA(sql,escapeChar) || IS_SPACE(sql,escapeChar))) {
elements->measureEscaped = true;
measureLenEscaped++;
sql++;
continue;
}
if (unlikely(IS_COMMA(sql))) {
break; break;
} }
if (unlikely(IS_SPACE(sql))) { if (unlikely((sql != elements->measure) && IS_SLASH_LETTER_IN_MEASUREMENT(sql))) {
break; elements->measureEscaped = true;
escapeChar = sql;
measureLenEscaped++;
sql++;
continue;
} }
sql++; sql++;
} }
@ -478,9 +482,12 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
// to get measureTagsLen before // to get measureTagsLen before
const char *tmp = sql; const char *tmp = sql;
while (tmp < sqlEnd) { while (tmp < sqlEnd) {
if (unlikely(IS_SPACE(tmp))) { if (unlikely(IS_SPACE(tmp,escapeChar))) {
break; break;
} }
if(unlikely(IS_SLASH_LETTER_IN_TAG_FIELD_KEY(tmp))){
escapeChar = tmp;
}
tmp++; tmp++;
} }
elements->measureTagsLen = tmp - elements->measure; elements->measureTagsLen = tmp - elements->measure;

View File

@ -876,12 +876,13 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = taosAllocateQall(); STaosQall* qall = taosAllocateQall();
taosReadAllQitems(pTmq->delayedTask, qall); taosReadAllQitems(pTmq->delayedTask, qall);
if (qall->numOfItems == 0) { int32_t numOfItems = taosQallItemSize(qall);
if (numOfItems == 0) {
taosFreeQall(qall); taosFreeQall(qall);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems); tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
int8_t* pTaskType = NULL; int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType); taosGetQitem(qall, (void**)&pTaskType);
@ -1839,7 +1840,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
} }
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
while (1) { while (1) {
SMqRspWrapper* pRspWrapper = NULL; SMqRspWrapper* pRspWrapper = NULL;

View File

@ -194,26 +194,26 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
while (pVnode->refCount > 0) taosMsleep(10); while (pVnode->refCount > 0) taosMsleep(10);
dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue, dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
pVnode->pWriteW.queue->threadId); taosQueueGetThreadId(pVnode->pWriteW.queue));
tMultiWorkerCleanup(&pVnode->pWriteW); tMultiWorkerCleanup(&pVnode->pWriteW);
dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue, dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
pVnode->pSyncW.queue->threadId); taosQueueGetThreadId(pVnode->pSyncW.queue));
tMultiWorkerCleanup(&pVnode->pSyncW); tMultiWorkerCleanup(&pVnode->pSyncW);
dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue, dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
pVnode->pSyncRdW.queue->threadId); taosQueueGetThreadId(pVnode->pSyncRdW.queue));
tMultiWorkerCleanup(&pVnode->pSyncRdW); tMultiWorkerCleanup(&pVnode->pSyncRdW);
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue, dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId); taosQueueGetThreadId(pVnode->pApplyW.queue));
tMultiWorkerCleanup(&pVnode->pApplyW); tMultiWorkerCleanup(&pVnode->pApplyW);
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ); dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
pVnode->pFetchQ->threadId); taosQueueGetThreadId(pVnode->pFetchQ));
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
tqNotifyClose(pVnode->pImpl->pTq); tqNotifyClose(pVnode->pImpl->pTq);

View File

@ -365,16 +365,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
} }
dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue, dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
pVnode->pWriteW.queue->threadId); taosQueueGetThreadId(pVnode->pWriteW.queue));
dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue, dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
pVnode->pSyncW.queue->threadId); taosQueueGetThreadId(pVnode->pSyncW.queue));
dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue, dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
pVnode->pSyncRdW.queue->threadId); taosQueueGetThreadId(pVnode->pSyncRdW.queue));
dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue, dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId); taosQueueGetThreadId(pVnode->pApplyW.queue));
dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ); dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
pVnode->pFetchQ->threadId); taosQueueGetThreadId(pVnode->pFetchQ));
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
return 0; return 0;
} }

View File

@ -363,13 +363,15 @@ static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *p
} }
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) { static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "kill-compact"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("compact:%" PRId32 ", failed to drop since %s" , pCompact->compactId, terrstr()); mError("compact:%" PRId32 ", failed to drop since %s" , pCompact->compactId, terrstr());
return -1; return -1;
} }
mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId); mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
mndTransSetDbName(pTrans, pCompact->dbname, NULL);
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
@ -378,7 +380,7 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa
} }
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SCompactDetailObj *pDetail = NULL; SCompactDetailObj *pDetail = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
@ -612,15 +614,17 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
return 0; return 0;
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-compact-progress"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("trans:%" PRId32 ", failed to create since %s" , pTrans->id, terrstr()); mError("trans:%" PRId32 ", failed to create since %s" , pTrans->id, terrstr());
return -1; return -1;
} }
mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id); mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
mndTransSetDbName(pTrans, pCompact->dbname, NULL);
pIter = NULL; pIter = NULL;
while (1) { while (1) {
SCompactDetailObj *pDetail = NULL; SCompactDetailObj *pDetail = NULL;

View File

@ -1009,6 +1009,22 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
pSup->deleteMark = INT64_MAX; pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false; pInfo->ignoreExpiredData = false;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
qInfo("save stream task:%s, param for state: %d", GET_TASKID(pTaskInfo), pInfo->ignoreExpiredData);
} }
// iterate operator tree // iterate operator tree

View File

@ -3763,8 +3763,9 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
taosMemoryFree(pSubTblsInfo); taosMemoryFree(pSubTblsInfo);
pInfo->pSubTablesMergeInfo = NULL; pInfo->pSubTablesMergeInfo = NULL;
taosMemoryTrim(0);
} }
taosMemoryTrim(0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -1343,7 +1343,7 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, syncEnv()->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else { } else {
@ -1415,8 +1415,8 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
&pSyncNode->pHeartbeatTimer); syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else { } else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
@ -2153,7 +2153,11 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex
static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqPingTimer(void* param, void* tmrId) {
if (!syncIsInit()) return; if (!syncIsInit()) return;
SSyncNode* pNode = param; int64_t rid = (int64_t)param;
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return;
if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) { if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
@ -2173,7 +2177,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
} }
_out: _out:
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer); taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pPingTimer);
} }
} }
@ -2224,7 +2229,11 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if (!syncIsInit()) return; if (!syncIsInit()) return;
SSyncNode* pNode = param; int64_t rid = (int64_t)param;
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return;
if (pNode->totalReplicaNum > 1) { if (pNode->totalReplicaNum > 1) {
if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) { if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
@ -2245,7 +2254,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
} }
_out: _out:
taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager, taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pHeartbeatTimer); &pNode->pHeartbeatTimer);
} else { } else {
@ -3385,4 +3394,4 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
return true; return true;
} }
#endif #endif

View File

@ -89,12 +89,14 @@ static int32_t taosArrayResize(SArray* pArray) {
int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) { int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
if (newCap > pArray->capacity) { if (newCap > pArray->capacity) {
float factor = BOUNDARY_BIG_FACTOR; float factor = BOUNDARY_BIG_FACTOR;
if(newCap * pArray->elemSize > BOUNDARY_SIZE){ if (newCap * pArray->elemSize > BOUNDARY_SIZE) {
factor = BOUNDARY_SMALL_FACTOR; factor = BOUNDARY_SMALL_FACTOR;
} }
size_t tsize = (pArray->capacity * factor); size_t tsize = (pArray->capacity * factor);
while (newCap > tsize) { while (newCap > tsize) {
tsize = (tsize * factor); size_t newSize = (tsize * factor);
tsize = (newSize == tsize) ? (tsize + 2) : newSize;
} }
pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize); pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize);

View File

@ -21,6 +21,40 @@
int64_t tsRpcQueueMemoryAllowed = 0; int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0; int64_t tsRpcQueueMemoryUsed = 0;
struct STaosQueue {
STaosQnode *head;
STaosQnode *tail;
STaosQueue *next; // for queue set
STaosQset *qset; // for queue set
void *ahandle; // for queue set
FItem itemFp;
FItems itemsFp;
TdThreadMutex mutex;
int64_t memOfItems;
int32_t numOfItems;
int64_t threadId;
int64_t memLimit;
int64_t itemLimit;
};
struct STaosQset {
STaosQueue *head;
STaosQueue *current;
TdThreadMutex mutex;
tsem_t sem;
int32_t numOfQueues;
int32_t numOfItems;
};
struct STaosQall {
STaosQnode *current;
STaosQnode *start;
int32_t numOfItems;
int64_t memOfItems;
int32_t unAccessedNumOfItems;
int64_t unAccessMemOfItems;
};
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; } void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; } void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
@ -497,6 +531,12 @@ int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfI
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
void taosQueueSetThreadId(STaosQueue* pQueue, int64_t threadId) {
pQueue->threadId = threadId;
}
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
#if 0 #if 0
void taosResetQsetThread(STaosQset *qset, void *pItem) { void taosResetQsetThread(STaosQset *qset, void *pItem) {

View File

@ -417,9 +417,9 @@ _OVER:
return NULL; return NULL;
} else { } else {
while (worker->pid <= 0) taosMsleep(10); while (worker->pid <= 0) taosMsleep(10);
queue->threadId = worker->pid;
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, taosQueueSetThreadId(queue, worker->pid);
queue->threadId); uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
return queue; return queue;
} }
} }

View File

@ -1212,6 +1212,7 @@
,,y,script,./test.sh -f tsim/stream/deleteState.sim ,,y,script,./test.sh -f tsim/stream/deleteState.sim
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim ,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim ,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
,,y,script,./test.sh -f tsim/stream/distributeMultiLevelInterval0.sim
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim ,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
,,y,script,./test.sh -f tsim/stream/drop_stream.sim ,,y,script,./test.sh -f tsim/stream/drop_stream.sim
,,y,script,./test.sh -f tsim/stream/event0.sim ,,y,script,./test.sh -f tsim/stream/event0.sim

View File

@ -0,0 +1,267 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c streamAggCnt -v 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step1
sql drop stream if exists streams1;
sql drop database if exists test;
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4 from st interval(10s);
sleep 1000
sql insert into ts1 values(1648791213000,1,1,3,4.1);
sql insert into ts1 values(1648791223000,2,2,3,1.1);
sql insert into ts1 values(1648791233000,3,3,3,2.1);
sql insert into ts1 values(1648791243000,4,4,3,3.1);
sql insert into ts2 values(1648791213000,1,5,3,4.1);
sql insert into ts2 values(1648791223000,2,6,3,1.1);
sql insert into ts2 values(1648791233000,3,7,3,2.1);
sql insert into ts2 values(1648791243000,4,8,3,3.1);
$loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop0
endi
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data11 != 2 then
print =====data11=$data11
goto loop0
endi
if $data21 != 2 then
print =====data21=$data21
goto loop0
endi
if $data31 != 2 then
print =====data31=$data31
goto loop0
endi
sql insert into ts1 values(1648791213000,1,9,3,4.1);
$loop_count = 0
loop1:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop1
endi
if $data11 != 2 then
print =====data11=$data11
goto loop1
endi
if $data21 != 2 then
print =====data21=$data21
goto loop1
endi
if $data31 != 2 then
print =====data31=$data31
goto loop1
endi
sql delete from ts2 where ts = 1648791243000 ;
$loop_count = 0
loop2:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop2
endi
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
if $data11 != 2 then
print =====data11=$data11
goto loop2
endi
if $data21 != 2 then
print =====data21=$data21
goto loop2
endi
if $data31 != 1 then
print =====data31=$data31
goto loop2
endi
sql delete from ts2 where ts = 1648791223000 ;
$loop_count = 0
loop3:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop3
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data11 != 1 then
print =====data11=$data11
goto loop3
endi
if $data21 != 2 then
print =====data21=$data21
goto loop3
endi
if $data31 != 1 then
print =====data31=$data31
goto loop3
endi
sql insert into ts1 values(1648791233001,3,9,3,2.1);
$loop_count = 0
loop4:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop4
endi
if $data01 != 2 then
print =====data01=$data01
goto loop4
endi
if $data11 != 1 then
print =====data11=$data11
goto loop4
endi
if $data21 != 3 then
print =====data21=$data21
goto loop4
endi
if $data31 != 1 then
print =====data31=$data31
goto loop4
endi
sql select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
print ===== over
system sh/stop_dnodes.sh

View File

@ -1018,7 +1018,7 @@ int sml_escape_Test() {
ASSERT(numFields == 5); ASSERT(numFields == 5);
ASSERT(strncmp(fields[1].name, "inode\"i,= s_used", sizeof("inode\"i,= s_used") - 1) == 0); ASSERT(strncmp(fields[1].name, "inode\"i,= s_used", sizeof("inode\"i,= s_used") - 1) == 0);
ASSERT(strncmp(fields[2].name, "total", sizeof("total") - 1) == 0); ASSERT(strncmp(fields[2].name, "total", sizeof("total") - 1) == 0);
ASSERT(strncmp(fields[3].name, "inode\"i,= s_f\\\\ree", sizeof("inode\"i,= s_f\\\\ree") - 1) == 0); ASSERT(strncmp(fields[3].name, "inode\"i,= s_f\\ree", sizeof("inode\"i,= s_f\\ree") - 1) == 0);
ASSERT(strncmp(fields[4].name, "dev\"i,= ce", sizeof("dev\"i,= ce") - 1) == 0); ASSERT(strncmp(fields[4].name, "dev\"i,= ce", sizeof("dev\"i,= ce") - 1) == 0);
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
@ -1044,6 +1044,88 @@ int sml_escape_Test() {
return code; return code;
} }
// test field with end of escape
int sml_escape1_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists db_escape");
taos_free_result(pRes);
pRes = taos_query(taos, "use db_escape");
taos_free_result(pRes);
const char *sql[] = {
"stab,t1\\=1 c1=3,c2=\"32fw\" 1661943970000000000",
"stab,t1=1\\ c1=3,c2=\"32fw\" 1661943980000000000",
"stab,t1=1 c1\\=3,c2=\"32fw\" 1661943990000000000",
};
for(int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++){
pRes = taos_schemaless_insert(taos, (char**)&sql[i], 1, TSDB_SML_LINE_PROTOCOL, 0);
int code = taos_errno(pRes);
ASSERT(code);
}
const char *sql1[] = {
"stab\\,t1=1 c1=3,c2=\"32fw\" 1661943960000000000",
"stab\\\\,t1=1 c1=3,c2=\"32fw\" 1661943960000000000",
"stab,t1\\\\=1 c1=3,c2=\"32fw\" 1661943970000000000",
"stab,t1=1\\\\ c1=3,c2=\"32fw\" 1661943980000000000",
"stab,t1=1 c1\\\\=3,c2=\"32fw\" 1661943990000000000",
};
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, 0);
printf("%s result:%s, rows:%d\n", __FUNCTION__, taos_errstr(pRes), taos_affected_rows(pRes));
int code = taos_errno(pRes);
ASSERT(!code);
ASSERT(taos_affected_rows(pRes) == 5);
taos_free_result(pRes);
pRes = taos_query(taos, "select * from stab"); //check stable name
ASSERT(pRes);
int fieldNum = taos_field_count(pRes);
ASSERT(fieldNum == 6);
printf("fieldNum:%d\n", fieldNum);
int numFields = taos_num_fields(pRes);
TAOS_FIELD *fields = taos_fetch_fields(pRes);
ASSERT(numFields == 6);
ASSERT(strncmp(fields[1].name, "c1", sizeof("c1") - 1) == 0);
ASSERT(strncmp(fields[2].name, "c2", sizeof("c2") - 1) == 0);
ASSERT(strncmp(fields[3].name, "c1\\", sizeof("c1\\") - 1) == 0);
ASSERT(strncmp(fields[4].name, "t1\\", sizeof("t1\\") - 1) == 0);
ASSERT(strncmp(fields[5].name, "t1", sizeof("t1") - 1) == 0);
TAOS_ROW row = NULL;
int32_t rowIndex = 0;
while ((row = taos_fetch_row(pRes)) != NULL) {
int64_t ts = *(int64_t *)row[0];
if (ts == 1661943970000) {
ASSERT(*(double *)row[1] == 3);
ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0);
ASSERT(row[3] == NULL);
ASSERT(strncmp(row[4], "1", sizeof("1") - 1) == 0);
ASSERT(row[5] == NULL);
}else if (ts == 1661943980000) {
ASSERT(*(double *)row[1] == 3);
ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0);
ASSERT(row[3] == NULL);
ASSERT(row[4] == NULL);
ASSERT(strncmp(row[5], "1\\", sizeof("1\\") - 1) == 0);
}else if (ts == 1661943990000) {
ASSERT(row[1] == NULL);
ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0);
ASSERT(*(double *)row[3] == 3);
ASSERT(row[4] == NULL);
ASSERT(strncmp(row[5], "1", sizeof("1") - 1) == 0);
}
rowIndex++;
}
taos_free_result(pRes);
taos_close(taos);
return code;
}
int sml_19221_Test() { int sml_19221_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -1775,17 +1857,14 @@ int main(int argc, char *argv[]) {
ASSERT(ret); ASSERT(ret);
ret = sml_escape_Test(); ret = sml_escape_Test();
ASSERT(!ret); ASSERT(!ret);
ret = sml_escape1_Test();
ASSERT(!ret);
ret = sml_ts3116_Test(); ret = sml_ts3116_Test();
ASSERT(!ret); ASSERT(!ret);
ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
ASSERT(!ret); ASSERT(!ret);
ret = sml_ts3303_Test(); ret = sml_ts3303_Test();
ASSERT(!ret); ASSERT(!ret);
// for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){
// printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i])));
// }
// int ret = 0;
ret = sml_ttl_Test(); ret = sml_ttl_Test();
ASSERT(!ret); ASSERT(!ret);
ret = sml_ts2164_Test(); ret = sml_ts2164_Test();