From 53dd2050fd5786776083b435158587e580f70609 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Tue, 24 Nov 2020 17:05:20 +0800 Subject: [PATCH 01/13] [TD-2140]: use tref to manage sql objects --- src/client/inc/tsclient.h | 6 ++--- src/client/src/tscLocal.c | 7 ++++-- src/client/src/tscServer.c | 49 ++++++++++++++------------------------ src/client/src/tscSql.c | 18 +++++--------- src/client/src/tscSub.c | 4 ++-- src/client/src/tscSystem.c | 14 +++++++---- src/client/src/tscUtil.c | 22 ++++++++--------- 7 files changed, 54 insertions(+), 66 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ff36cf0f5a..f38a8e6c18 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -333,7 +333,7 @@ typedef struct STscObj { char superAuth : 1; uint32_t connId; uint64_t rid; // ref ID returned by taosAddRef - struct SSqlObj * pHb; + int64_t hbrid; struct SSqlObj * sqlList; struct SSqlStream *streamList; void* pDnodeConn; @@ -373,7 +373,7 @@ typedef struct SSqlObj { struct SSqlObj **pSubs; struct SSqlObj *prev, *next; - struct SSqlObj **self; + int64_t self; } SSqlObj; typedef struct SSqlStream { @@ -507,7 +507,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField } extern SCacheObj* tscMetaCache; -extern SCacheObj* tscObjCache; +extern int tscObjRef; extern void * tscTmr; extern void * tscQhandle; extern int tscKeepConn[]; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 538e652f3c..4c28adc261 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -825,8 +825,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; - if (pObj->pHb != NULL) { - if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + if (pHb != NULL) { + int32_t code = pHb->res.code; + taosReleaseRef(tscObjRef, pObj->hbrid); + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; return pSql->res.code; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cbc5604a27..b964a4f15d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code)); + tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code)); } - if (pObj->pHb != NULL) { + if (pObj->hbrid != 0) { int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); @@ -193,20 +193,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { STscObj *pObj = taosAcquireRef(tscRefId, rid); if (pObj == NULL) return; - SSqlObj* pHB = pObj->pHb; - - void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - tscWarn("%p HB object has been released already", pHB); - taosReleaseRef(tscRefId, pObj->rid); - return; - } - - assert(*pHB->self == pHB); + SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid); + assert(pHB->self == pObj->hbrid); pHB->retry = 0; int32_t code = tscProcessSql(pHB); - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pObj->hbrid); if (code != TSDB_CODE_SUCCESS) { tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); @@ -236,7 +228,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .msgType = pSql->cmd.msgType, .pCont = pMsg, .contLen = pSql->cmd.payloadLen, - .ahandle = pSql, + .ahandle = (void*)pSql->self, .handle = NULL, .code = 0 }; @@ -247,26 +239,24 @@ int tscSendMsgToServer(SSqlObj *pSql) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; - void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { + SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); + if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); return; } - - SSqlObj* pSql = *p; - assert(pSql != NULL); + assert(pSql->self == handle); STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - assert(*pSql->self == pSql); pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); - taosCacheRelease(tscObjCache, (void**) &p, true); + taosRemoveRef(tscObjRef, pSql->self); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -276,10 +266,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); - void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p1, false); - - taosCacheRelease(tscObjCache, (void**) &p, true); + taosRemoveRef(tscObjRef, pSql->self); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -322,7 +310,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // if there is an error occurring, proceed to the following error handling procedure. if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pSql->self); rpcFreeCont(rpcMsg->pCont); return; } @@ -390,11 +378,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { (*pSql->fp)(pSql->param, pSql, rpcMsg->code); } - void** p1 = p; - taosCacheRelease(tscObjCache, (void**) &p1, false); + taosReleaseRef(tscObjRef, pSql->self); if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it - taosCacheRelease(tscObjCache, (void **)&p, true); + taosRemoveRef(tscObjRef, pSql->self); tscDebug("%p sqlObj is automatically freed", pSql); } @@ -2019,7 +2006,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { // TODO multithread problem static void createHBObj(STscObj* pObj) { - if (pObj->pHb != NULL) { + if (pObj->hbrid != 0) { return; } @@ -2049,7 +2036,7 @@ static void createHBObj(STscObj* pObj) { registerSqlObj(pSql); tscDebug("%p HB is allocated, pObj:%p", pSql, pObj); - pObj->pHb = pSql; + pObj->hbrid = pSql->self; } int tscProcessConnectRsp(SSqlObj *pSql) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 020305a0a8..50dfbcbec9 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -276,8 +276,8 @@ void taos_close(TAOS *taos) { pObj->signature = NULL; taosTmrStopA(&(pObj->pTimer)); - SSqlObj* pHb = pObj->pHb; - if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { + SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); + if (pHb != NULL) { if (pHb->rpcRid > 0) { // wait for rsp from dnode rpcCancelRequest(pHb->rpcRid); pHb->rpcRid = -1; @@ -285,6 +285,7 @@ void taos_close(TAOS *taos) { tscDebug("%p HB is freed", pHb); taos_free_result(pHb); + taosReleaseRef(tscObjRef, pHb->self); } int32_t ref = T_REF_DEC(pObj); @@ -597,8 +598,7 @@ void taos_free_result(TAOS_RES *res) { bool freeNow = tscKillQueryInDnode(pSql); if (freeNow) { tscDebug("%p free sqlObj in cache", pSql); - SSqlObj** p = pSql->self; - taosCacheRelease(tscObjCache, (void**) &p, true); + taosReleaseRef(tscObjRef, pSql->self); } } @@ -691,13 +691,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { continue; } - void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - continue; - } - - SSqlObj* pSubObj = (SSqlObj*) (*p); - assert(pSubObj->self == (SSqlObj**) p); + SSqlObj* pSubObj = pSub; pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; if (pSubObj->rpcRid > 0) { @@ -706,7 +700,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { } tscQueueAsyncRes(pSubObj); - taosCacheRelease(tscObjCache, (void**) &p, false); + taosReleaseRef(tscObjRef, pSubObj->self); } tscDebug("%p super table query cancelled", pSql); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 2c81bd7c7c..3bce7bee77 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -179,8 +179,8 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* fail: tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code)); if (pSql != NULL) { - if (pSql->self != NULL) { - taos_free_result(pSql); + if (pSql->self != 0) { + taosReleaseRef(tscObjRef, pSql->self); } else { tscFreeSqlObj(pSql); } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 9e9a00550a..672d87e0c8 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -15,7 +15,7 @@ #include "os.h" #include "taosmsg.h" -#include "tcache.h" +#include "tref.h" #include "trpc.h" #include "tsystem.h" #include "ttimer.h" @@ -31,7 +31,7 @@ // global, not configurable SCacheObj* tscMetaCache; -SCacheObj* tscObjCache; +int tscObjRef = -1; void * tscTmr; void * tscQhandle; void * tscCheckDiskUsageTmr; @@ -144,7 +144,11 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); +#ifndef SQLOBJ_USE_CACHE + tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj); +#else tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); +#endif } tscRefId = taosOpenRef(200, tscCloseTscObj); @@ -167,9 +171,9 @@ void taos_cleanup(void) { taosCacheCleanup(m); } - m = tscObjCache; - if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) { - taosCacheCleanup(m); + int refId = atomic_exchange_32(&tscObjRef, -1); + if (refId != -1) { + taosCloseRef(refId); } m = tscQhandle; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7a82bcaaab..b35dbb47e6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -447,18 +447,18 @@ static void tscFreeSubobj(SSqlObj* pSql) { void tscFreeRegisteredSqlObj(void *pSql) { assert(pSql != NULL); - SSqlObj** p = (SSqlObj**)pSql; - STscObj* pTscObj = (*p)->pTscObj; + SSqlObj* p = *(SSqlObj**)pSql; + STscObj* pTscObj = p->pTscObj; - assert((*p)->self != 0 && (*p)->self == (p)); - tscFreeSqlObj(*p); + assert(p->self != 0); + tscFreeSqlObj(p); int32_t ref = T_REF_DEC(pTscObj); assert(ref >= 0); - tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); + tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref); if (ref == 0) { - tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); + tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj); taosRemoveRef(tscRefId, pTscObj->rid); } } @@ -1560,6 +1560,8 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { } } +#ifndef SQLOBJ_USE_CACHE +#else void tscSetFreeHeatBeat(STscObj* pObj) { if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) { return; @@ -1572,6 +1574,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0); pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; } +#endif /* * the following four kinds of SqlObj should not be freed @@ -1591,7 +1594,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { } STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) { + if (pSql->pStream != NULL || pTscObj->hbrid == pSql->self || pSql->pSubscription != NULL) { return false; } @@ -1883,13 +1886,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { } void registerSqlObj(SSqlObj* pSql) { - int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec - int32_t ref = T_REF_INC(pSql->pTscObj); tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref); - TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql; - pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME); + pSql->self = taosAddRef(tscObjRef, pSql); } SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { From d3352f551140bd73f56afcb5f956daa83ba0d1b5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Nov 2020 18:27:24 +0800 Subject: [PATCH 02/13] [TD-225] reduce memory consumption. --- src/client/src/tscSubquery.c | 3 +++ src/client/src/tscUtil.c | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 6ebbeeef41..819a323db5 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2198,6 +2198,9 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); + // free the data block created from insert sql string + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); return code; // here the pSql may have been released already. diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b35dbb47e6..703e266eb8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -833,9 +833,14 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { dataBuf->size += (finalLen + sizeof(SSubmitBlk)); assert(dataBuf->size <= dataBuf->nAllocSize); + // free unnecessary memory resource ASAP. + char* p = realloc(dataBuf->pData, dataBuf->size); + if (p != NULL) { + dataBuf->pData = p; + } + // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); - dataBuf->numOfTables += 1; } From 1386c05504ad369aaa60519c91b9829847454d55 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Wed, 25 Nov 2020 10:29:25 +0800 Subject: [PATCH 03/13] [TD-2140]: remove temporary code --- src/client/src/tscSystem.c | 4 ---- src/client/src/tscUtil.c | 16 ---------------- 2 files changed, 20 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 672d87e0c8..91eabda78b 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -144,11 +144,7 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); -#ifndef SQLOBJ_USE_CACHE tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj); -#else - tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); -#endif } tscRefId = taosOpenRef(200, tscCloseTscObj); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 703e266eb8..950e904c4f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1565,22 +1565,6 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) { } } -#ifndef SQLOBJ_USE_CACHE -#else -void tscSetFreeHeatBeat(STscObj* pObj) { - if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) { - return; - } - - SSqlObj* pHeatBeat = pObj->pHb; - assert(pHeatBeat == pHeatBeat->signature); - - // to denote the heart-beat timer close connection and free all allocated resources - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0); - pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; -} -#endif - /* * the following four kinds of SqlObj should not be freed * 1. SqlObj for stream computing From 679d3093717fffa8d9cd9aaef61a58d72e06c39d Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Mon, 30 Nov 2020 14:30:29 +0800 Subject: [PATCH 04/13] fix test case error --- Jenkinsfile | 8 +++ tests/pytest/insert/restfulInsert.py | 86 +++++++++++++++++++++++++--- tests/pytest/query/bug2218.py | 12 ++-- 3 files changed, 91 insertions(+), 15 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index dc7836c3da..940815febe 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -14,10 +14,12 @@ pipeline { sh ''' date cd ${WKC} + git reset --hard git checkout develop git pull git submodule update cd ${WK} + git reset --hard git checkout develop git pull export TZ=Asia/Harbin @@ -39,11 +41,13 @@ pipeline { steps { sh ''' cd ${WKC} + git reset --hard git checkout develop git pull git submodule update cd ${WK} + git reset --hard git checkout develop git pull export TZ=Asia/Harbin @@ -65,11 +69,13 @@ pipeline { steps { sh ''' cd ${WKC} + git reset --hard git checkout develop git pull git submodule update cd ${WK} + git reset --hard git checkout develop git pull export TZ=Asia/Harbin @@ -108,11 +114,13 @@ pipeline { steps { sh ''' cd ${WKC} + git reset --hard git checkout develop git pull git submodule update cd ${WK} + git reset --hard git checkout develop git pull export TZ=Asia/Harbin diff --git a/tests/pytest/insert/restfulInsert.py b/tests/pytest/insert/restfulInsert.py index e3a963f1d4..aad1efa60f 100644 --- a/tests/pytest/insert/restfulInsert.py +++ b/tests/pytest/insert/restfulInsert.py @@ -18,7 +18,7 @@ import time import argparse class RestfulInsert: - def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder): + def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder,tablePerbatch): self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='} self.url = "http://%s:6041/rest/sql" % host self.ts = startTimestamp @@ -29,12 +29,15 @@ class RestfulInsert: self.batchSize = batchSize self.tableNamePerfix = tbNamePerfix self.outOfOrder = outOfOrder + self.tablePerbatch = tablePerbatch def createTable(self, threadID): - tablesPerThread = int (self.numOfTables / self.numOfThreads) - print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * (threadID + 1) - 1)) - for i in range(tablesPerThread): + tablesPerThread = int (self.numOfTables / self.numOfThreads) + loop = tablesPerThread if threadID != self.numOfThreads - 1 else self.numOfTables - tablesPerThread * threadID + print("create table %d to %d" % (tablesPerThread * threadID, tablesPerThread * threadID + loop - 1)) + for i in range(loop): tableID = threadID * tablesPerThread + if tableID + i >= self.numOfTables : break name = 'beijing' if tableID % 2 == 0 else 'shanghai' data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name) response = requests.post(self.url, data, headers = self.header) @@ -55,6 +58,58 @@ class RestfulInsert: response = requests.post(self.url, data, headers = self.header) if response.status_code != 200: print(response.content) + + def insertnData(self, threadID): + print("thread %d started" % threadID) + tablesPerThread = int (self.numOfTables / self.numOfThreads) + loop = int(self.recordsPerTable / self.batchSize) + if self.tablePerbatch == 1 : + for i in range(tablesPerThread+1): + tableID = i + threadID * tablesPerThread + if tableID >= self.numOfTables: return + start = self.ts + start1=time.time() + for k in range(loop): + data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID) + values = [] + bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k + for l in range(bloop): + values.append("(%d, %d, %d, %d)" % (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))) + if len(data) > 1048576 : + print ('batch size is larger than 1M') + exit(-1) + if self.outOfOrder : + random.shuffle(values) + data+=''.join(values) + + response = requests.post(self.url, data, headers = self.header) + if response.status_code != 200: + print(response.content) + print('----------------',loop,time.time()-start1) + else: + for i in range(0,tablesPerThread+self.tablePerbatch,self.tablePerbatch): + for k in range(loop): + data = "insert into " + for j in range(self.tablePerbatch): + tableID = i + threadID * tablesPerThread+j + if tableID >= self.numOfTables: return + start = self.ts + data += "%s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID) + values = [] + bloop = self.batchSize if k != loop - 1 else self.recordsPerTable - self.batchSize * k + for l in range(bloop): + values.append("(%d, %d, %d, %d)" % (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))) + if self.outOfOrder : + random.shuffle(values) + data+=''.join(values) + print('------------------',len(data)) + if len(data) > 1024*1024 : + print ('batch size is larger than 1M') + exit(-1) + response = requests.post(self.url, data, headers = self.header) + if response.status_code != 200: + print(response.content) + def insertUnlimitedData(self, threadID): print("thread %d started" % threadID) @@ -85,7 +140,9 @@ class RestfulInsert: if response.status_code != 200: print(response.content) - def run(self): + def run(self): + data = "drop database if exists %s" % self.dbname + requests.post(self.url, data, headers = self.header) data = "create database if not exists %s" % self.dbname requests.post(self.url, data, headers = self.header) data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname @@ -135,14 +192,14 @@ parser.add_argument( '-d', '--db-name', action='store', - default='test', + default='test1', type=str, help='Database name to be created (default: test)') parser.add_argument( '-t', '--number-of-threads', action='store', - default=10, + default=20, type=int, help='Number of threads to create tables and insert datas (default: 10)') parser.add_argument( @@ -156,7 +213,7 @@ parser.add_argument( '-r', '--number-of-records', action='store', - default=1000, + default=10000, type=int, help='Number of record to be created for each table (default: 1000, -1 for unlimited records)') parser.add_argument( @@ -178,7 +235,18 @@ parser.add_argument( '--out-of-order', action='store_true', help='The order of test data (default: False)') +parser.add_argument( + '-b', + '--table-per-batch', + action='store', + default=1, + type=int, + help='the table per batch (default: 1)') + + args = parser.parse_args() -ri = RestfulInsert(args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order) +ri = RestfulInsert( + args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, + args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order, args.table_per_batch) ri.run() \ No newline at end of file diff --git a/tests/pytest/query/bug2218.py b/tests/pytest/query/bug2218.py index bb92e5d9ce..080472383d 100644 --- a/tests/pytest/query/bug2218.py +++ b/tests/pytest/query/bug2218.py @@ -38,12 +38,12 @@ class TDTestCase: print("test col*1*1 desc ") tdSql.query('select c1,c1*1*1,c2*1*1,c3*1*1,c4*1*1,c5*1*1,c6*1*1 from mt0 order by ts desc limit 2') tdSql.checkData(0,0,99) - tdSql.checkData(0,1,0.0) - tdSql.checkData(0,2,0.0) - tdSql.checkData(0,3,0.0) - tdSql.checkData(0,4,0.0) - tdSql.checkData(0,5,0.0) - tdSql.checkData(0,6,0.0) + tdSql.checkData(0,1,99.0) + tdSql.checkData(0,2,499.0) + tdSql.checkData(0,3,99.0) + tdSql.checkData(0,4,99.0) + tdSql.checkData(0,5,99.0) + tdSql.checkData(0,6,999.0) def stop(self): From 57bbd174a9f3b0d1f2b8a2426ab84f5317c32289 Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Mon, 30 Nov 2020 14:31:47 +0800 Subject: [PATCH 05/13] fix test case error --- tests/pytest/query/bug2117.py | 50 +++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 tests/pytest/query/bug2117.py diff --git a/tests/pytest/query/bug2117.py b/tests/pytest/query/bug2117.py new file mode 100644 index 0000000000..1158b78a2a --- /dev/null +++ b/tests/pytest/query/bug2117.py @@ -0,0 +1,50 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + print("==========step1") + print("create table && insert data") + + tdSql.execute("create table mt0 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20))") + insertRows = 1000 + t0 = 1604298064000 + tdLog.info("insert %d rows" % (insertRows)) + for i in range(insertRows): + ret = tdSql.execute( + "insert into mt0 values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s')" % + (t0+i,i%100,i/2,i%41,i%100,i%100,i*1.0,i%2,'taos'+str(i%100),'涛思'+str(i%100))) + print("==========step2") + print("test last with group by normal_col ") + tdSql.query('select last(c1) from mt0 group by c3') + tdSql.checkData(0,0,84) + tdSql.checkData(0,1,85) + + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file From ab6d41f706c3fe89b6c8d76aae48a0e3f22f0419 Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Mon, 30 Nov 2020 14:34:36 +0800 Subject: [PATCH 06/13] fix test case error --- tests/pytest/insert/restfulInsert.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/pytest/insert/restfulInsert.py b/tests/pytest/insert/restfulInsert.py index aad1efa60f..df253040f3 100644 --- a/tests/pytest/insert/restfulInsert.py +++ b/tests/pytest/insert/restfulInsert.py @@ -140,9 +140,7 @@ class RestfulInsert: if response.status_code != 200: print(response.content) - def run(self): - data = "drop database if exists %s" % self.dbname - requests.post(self.url, data, headers = self.header) + def run(self): data = "create database if not exists %s" % self.dbname requests.post(self.url, data, headers = self.header) data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname @@ -192,14 +190,14 @@ parser.add_argument( '-d', '--db-name', action='store', - default='test1', + default='test', type=str, help='Database name to be created (default: test)') parser.add_argument( '-t', '--number-of-threads', action='store', - default=20, + default=10, type=int, help='Number of threads to create tables and insert datas (default: 10)') parser.add_argument( @@ -213,7 +211,7 @@ parser.add_argument( '-r', '--number-of-records', action='store', - default=10000, + default=1000, type=int, help='Number of record to be created for each table (default: 1000, -1 for unlimited records)') parser.add_argument( From a432168ae5399366bd947b3be1aeae3309ab776f Mon Sep 17 00:00:00 2001 From: stephenkgu Date: Mon, 30 Nov 2020 14:25:54 +0800 Subject: [PATCH 07/13] [TD-2265]: fix TSDB_MAX_ALLOWED_SQL_LEN from 8MB to 1MB --- src/inc/taosdef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 20c7af6a21..ec1e1fc330 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -257,7 +257,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_SHOW_LEN 512 -#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb +#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 1mb #define TSDB_APPNAME_LEN TSDB_UNI_LEN From eebed5c2100a81077c99d17e7b924eb2a6ae4373 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 30 Nov 2020 08:16:58 +0000 Subject: [PATCH 08/13] fix TD-2279 --- src/tsdb/src/tsdbRWHelper.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 5b65b2185a..8e57066d27 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -917,6 +917,8 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); } + ASSERT((blkIdx == pIdx->numOfBlocks -1) || (!pCompBlock->last)); + tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); @@ -1042,6 +1044,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; + ASSERT((blkIdx == pIdx->numOfBlocks-1) || (!pCompBlock->last)); + tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); @@ -1622,11 +1626,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, pCfg->update); if (pDataCols->numOfRows == 0) break; - if (tblkIdx == pIdx->numOfBlocks - 1) { - if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; - } else { - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - } + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (round == 0) { if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; From 2b11f6ab4e094c20bc525811a257c724b0a6e323 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Mon, 30 Nov 2020 08:42:42 +0000 Subject: [PATCH 09/13] [TD-2140]: fix invalid write --- src/client/src/tscUtil.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 893c3f4c39..af17f9b457 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -836,12 +836,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { dataBuf->size += (finalLen + sizeof(SSubmitBlk)); assert(dataBuf->size <= dataBuf->nAllocSize); - // free unnecessary memory resource ASAP. - char* p = realloc(dataBuf->pData, dataBuf->size); - if (p != NULL) { - dataBuf->pData = p; - } - // the length does not include the SSubmitBlk structure pBlocks->dataLen = htonl(finalLen); dataBuf->numOfTables += 1; From 2e69b78fcd054ec8f8570148b4a798fc93a96dcf Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 30 Nov 2020 10:32:12 +0000 Subject: [PATCH 10/13] TD-2283 --- src/cq/src/cqMain.c | 8 +++----- src/cq/test/cqtest.c | 2 +- src/inc/tcq.h | 2 +- src/vnode/src/vnodeMain.c | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 84b2c297d0..b1977fd5d9 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -40,15 +40,14 @@ typedef struct { int32_t vgId; + int32_t master; + int32_t num; // number of continuous streams char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_NAME_LEN]; FCqWrite cqWrite; - void *ahandle; - int32_t num; // number of continuous streams struct SCqObj *pHead; void *dbConn; - int32_t master; void *tmrCtrl; pthread_mutex_t mutex; } SCqContext; @@ -90,7 +89,6 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { tstrncpy(pContext->db, db, sizeof(pContext->db)); pContext->vgId = pCfg->vgId; pContext->cqWrite = pCfg->cqWrite; - pContext->ahandle = ahandle; tscEmbedded = 1; pthread_mutex_init(&pContext->mutex, NULL); @@ -342,7 +340,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { pHead->version = 0; // write into vnode write queue - pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ, NULL); + pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL); free(buffer); } diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index e1114fc024..41380f0d86 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -24,7 +24,7 @@ int64_t ver = 0; void *pCq = NULL; -int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { +int writeToQueue(int32_t vgId, void *data, int type, void *pMsg) { return 0; } diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 7a0727f1b8..afa744a9c4 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -21,7 +21,7 @@ extern "C" { #include "tdataformat.h" -typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); +typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg); typedef struct { int32_t vgId; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3d3abde3c7..0b29fcc908 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -272,7 +272,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; - cqCfg.cqWrite = vnodeWriteToWQueue; + cqCfg.cqWrite = vnodeWriteToCache; pVnode->cq = cqOpen(pVnode, &cqCfg); if (pVnode->cq == NULL) { vnodeCleanUp(pVnode); From 892e2599ecd0112413372ac422d7d98a27f0019a Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Tue, 1 Dec 2020 11:20:14 +0800 Subject: [PATCH 11/13] Revert "[TD-2268]: cancel all related timers in taosTmrCleanUp" --- src/util/src/ttimer.c | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index baf396f030..0222a6d80a 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -560,37 +560,6 @@ void taosTmrCleanUp(void* handle) { tmrDebug("%s timer controller is cleaned up.", ctrl->label); ctrl->label[0] = 0; - // cancel all timers of this controller - for (size_t i = 0; i < timerMap.size; i++) { - timer_list_t* list = timerMap.slots + i; - lockTimerList(list); - - tmr_obj_t* t = list->timers; - tmr_obj_t* prev = NULL; - while (t != NULL) { - tmr_obj_t* next = t->mnext; - if (t->ctrl != ctrl) { - prev = t; - t = next; - continue; - } - - uint8_t state = atomic_val_compare_exchange_8(&t->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); - if (state == TIMER_STATE_WAITING) { - removeFromWheel(t); - } - timerDecRef(t); - if (prev == NULL) { - list->timers = next; - } else { - prev->mnext = next; - } - t = next; - } - - unlockTimerList(list); - } - pthread_mutex_lock(&tmrCtrlMutex); ctrl->next = unusedTmrCtrl; numOfTmrCtrl--; From 4e0f1055dce5aaf60d70b122307f5026ba37e2c7 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 1 Dec 2020 03:44:37 +0000 Subject: [PATCH 12/13] remove a potential rpcSendRecv deadlock --- src/rpc/src/rpcMain.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index acceaf9d7a..4ccdec2bc7 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -630,7 +630,14 @@ static void rpcReleaseConn(SRpcConn *pConn) { } else { // if there is an outgoing message, free it if (pConn->outType && pConn->pReqMsg) { - if (pConn->pContext) pConn->pContext->pConn = NULL; + if (pContext->pRsp) { + // for synchronous API, post semaphore to unblock app + pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR; + pContext->pRsp->pCont = NULL; + pContext->pRsp->contLen = 0; + tsem_post(pContext->pSem); + } + pConn->pContext->pConn = NULL; taosRemoveRef(tsRpcRefId, pConn->pContext->rid); } } From 7fadd4cb3aec7b30c99572586b3a1179f87a4838 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 1 Dec 2020 03:48:24 +0000 Subject: [PATCH 13/13] remove a potential rpcSendRecv deadlock if rpcClose is called by another thread --- src/rpc/src/rpcMain.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 4ccdec2bc7..00a97d7bc2 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -630,6 +630,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { } else { // if there is an outgoing message, free it if (pConn->outType && pConn->pReqMsg) { + SRpcReqContext *pContext = pConn->pContext; if (pContext->pRsp) { // for synchronous API, post semaphore to unblock app pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR; @@ -637,8 +638,8 @@ static void rpcReleaseConn(SRpcConn *pConn) { pContext->pRsp->contLen = 0; tsem_post(pContext->pSem); } - pConn->pContext->pConn = NULL; - taosRemoveRef(tsRpcRefId, pConn->pContext->rid); + pContext->pConn = NULL; + taosRemoveRef(tsRpcRefId, pContext->rid); } }