diff --git a/.gitignore b/.gitignore index f2c1cb75b3..e91d6739a5 100644 --- a/.gitignore +++ b/.gitignore @@ -19,7 +19,6 @@ tests/test/ tests/taoshebei/ tests/taoscsv/ tests/taosdalipu/ -tests/pytest/ tests/jenkins/ tests/hdfs/ *.iml diff --git a/.travis.yml b/.travis.yml index 63c62d1a8a..221a1f1a30 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,6 +14,15 @@ os: - linux # - osx +before_install: + - |- + case $TRAVIS_OS_NAME in + linux) + sudo apt -y update + sudo apt -y install python-pip python3-pip python-setuptools python3-setuptools + ;; + esac + addons: coverity_scan: @@ -50,6 +59,15 @@ script: - |- case $TRAVIS_OS_NAME in linux) + # Color setting + RED='\033[0;31m' + GREEN='\033[1;32m' + GREEN_DARK='\033[0;32m' + GREEN_UNDERLINE='\033[4;32m' + NC='\033[0m' + + sudo make install + cd ../tests/script sudo ./test.sh 2>&1 | grep 'success\|failed' | tee out.txt @@ -57,16 +75,32 @@ script: if [ "$total_success" -gt "0" ]; then total_success=`expr $total_success - 1` + echo -e "${GREEN} ### Total $total_success TSIM case(s) succeed! ### ${NC}" fi - echo "Total $total_success success" - total_failed=`grep failed out.txt | wc -l` - echo "Total $total_failed failed" - if [ "$total_failed" -ne "0" ]; then + echo -e "${RED} ### Total $total_failed TSIM case(s) failed! ### ${NC}" exit $total_failed fi + + pip install --user ../../src/connector/python/linux/python2/ + pip3 install --user ../../src/connector/python/linux/python3/ + + cd ../pytest + sudo ./simpletest.sh 2>&1 | grep 'successfully executed\|failed' | tee pytest-out.txt + total_py_success=`grep 'successfully executed' pytest-out.txt | wc -l` + + if [ "$total_py_success" -gt "0" ]; then + echo -e "${GREEN} ### Total $total_py_success python case(s) succeed! ### ${NC}" + fi + + total_py_failed=`grep 'failed' pytest-out.txt | wc -l` + if [ "$total_py_failed" -ne "0" ]; then + echo -e "${RED} ### Total $total_py_failed python case(s) failed! ### ${NC}" + exit $total_py_failed + fi + ;; esac @@ -81,6 +115,10 @@ matrix: - build-essential - cmake - net-tools + - python-pip + - python-setuptools + - python3-pip + - python3-setuptools # - os: osx # addons: diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 1eee5d5e7f..41cfb77c5d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -698,6 +698,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { +#if 0 SSqlObj *pSql = (SSqlObj *)res; SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -737,6 +738,10 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { } return nRows; +#endif + + (*rows) = taos_fetch_row(res); + return ((*rows) != NULL)? 1:0; } int taos_select_db(TAOS *taos, const char *db) { diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index d4365dae10..f13c18f22a 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -32,27 +32,51 @@ typedef struct { SRpcMsg rpcMsg; } SReadMsg; +typedef struct { + pthread_t thread; // thread + int32_t workerId; // worker ID +} SReadWorker; + +typedef struct { + int32_t max; // max number of workers + int32_t min; // min number of workers + int32_t num; // current number of workers + SReadWorker *readWorker; +} SReadWorkerPool; + static void *dnodeProcessReadQueue(void *param); -static void dnodeHandleIdleReadWorker(); +static void dnodeHandleIdleReadWorker(SReadWorker *); // module global variable -static taos_qset readQset; -static int32_t threads; // number of query threads -static int32_t maxThreads; -static int32_t minThreads; +static SReadWorkerPool readPool; +static taos_qset readQset; int32_t dnodeInitRead() { readQset = taosOpenQset(); - minThreads = 3; - maxThreads = tsNumOfCores * tsNumOfThreadsPerCore; - if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads; + readPool.min = 2; + readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; + if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; + readPool.readWorker = (SReadWorker *) calloc(sizeof(SReadWorker), readPool.max); + + if (readPool.readWorker == NULL) return -1; + for (int i=0; i < readPool.max; ++i) { + SReadWorker *pWorker = readPool.readWorker + i; + pWorker->workerId = i; + } dPrint("dnode read is opened"); return 0; } void dnodeCleanupRead() { + + for (int i=0; i < readPool.max; ++i) { + SReadWorker *pWorker = readPool.readWorker + i; + if (pWorker->thread) + pthread_join(pWorker->thread, NULL); + } + taosCloseQset(readQset); dPrint("dnode read is closed"); } @@ -116,18 +140,25 @@ void *dnodeAllocateRqueue(void *pVnode) { taosAddIntoQset(readQset, queue, pVnode); // spawn a thread to process queue - if (threads < maxThreads) { - pthread_t thread; - pthread_attr_t thAttr; - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + if (readPool.num < readPool.max) { + do { + SReadWorker *pWorker = readPool.readWorker + readPool.num; - if (pthread_create(&thread, &thAttr, dnodeProcessReadQueue, readQset) != 0) { - dError("failed to create thread to process read queue, reason:%s", strerror(errno)); - } + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) { + dError("failed to create thread to process read queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + readPool.num++; + dTrace("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num); + } while (readPool.num < readPool.min); } - dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue); + dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue); return queue; } @@ -167,14 +198,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { } static void *dnodeProcessReadQueue(void *param) { - taos_qset qset = (taos_qset)param; - SReadMsg *pReadMsg; - int type; - void *pVnode; + SReadWorker *pWorker = param; + SReadMsg *pReadMsg; + int type; + void *pVnode; while (1) { - if (taosReadQitemFromQset(qset, &type, (void **)&pReadMsg, (void **)&pVnode) == 0) { - dnodeHandleIdleReadWorker(); + if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { + dnodeHandleIdleReadWorker(pWorker); continue; } @@ -186,11 +217,12 @@ static void *dnodeProcessReadQueue(void *param) { return NULL; } -static void dnodeHandleIdleReadWorker() { +static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) { int32_t num = taosGetQueueNumber(readQset); - if (num == 0 || (num <= minThreads && threads > minThreads)) { - threads--; + if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) { + readPool.num--; + dTrace("read worker:%d is released, total:%d", pWorker->workerId, readPool.num); pthread_exit(NULL); } else { usleep(100); diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 3c598ca360..b56e0d8ad7 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -28,6 +28,7 @@ #include "vnode.h" typedef struct { + taos_qall qall; taos_qset qset; // queue set pthread_t thread; // thread int32_t workerId; // worker ID @@ -65,6 +66,14 @@ int32_t dnodeInitWrite() { } void dnodeCleanupWrite() { + + for (int32_t i = 0; i < wWorkerPool.max; ++i) { + SWriteWorker *pWorker = wWorkerPool.writeWorker + i; + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + } + free(wWorkerPool.writeWorker); dPrint("dnode write is closed"); } @@ -113,6 +122,7 @@ void *dnodeAllocateWqueue(void *pVnode) { if (pWorker->qset == NULL) return NULL; taosAddIntoQset(pWorker->qset, queue, pVnode); + pWorker->qall = taosAllocateQall(); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; pthread_attr_t thAttr; @@ -122,13 +132,17 @@ void *dnodeAllocateWqueue(void *pVnode) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { dError("failed to create thread to process read queue, reason:%s", strerror(errno)); taosCloseQset(pWorker->qset); + } else { + dTrace("write worker:%d is launched", pWorker->workerId); } + + pthread_attr_destroy(&thAttr); } else { taosAddIntoQset(pWorker->qset, queue, pVnode); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } - dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue); + dTrace("pVnode:%p, write queue:%p is allocated", pVnode, queue); return queue; } @@ -160,17 +174,14 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) { static void *dnodeProcessWriteQueue(void *param) { SWriteWorker *pWorker = (SWriteWorker *)param; - taos_qall qall; SWriteMsg *pWrite; SWalHead *pHead; int32_t numOfMsgs; int type; void *pVnode, *item; - qall = taosAllocateQall(); - while (1) { - numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode); + numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); if (numOfMsgs <=0) { dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore continue; @@ -178,7 +189,7 @@ static void *dnodeProcessWriteQueue(void *param) { for (int32_t i = 0; i < numOfMsgs; ++i) { pWrite = NULL; - taosGetQitem(qall, &type, &item); + taosGetQitem(pWorker->qall, &type, &item); if (type == TAOS_QTYPE_RPC) { pWrite = (SWriteMsg *)item; pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); @@ -196,9 +207,9 @@ static void *dnodeProcessWriteQueue(void *param) { walFsync(vnodeGetWal(pVnode)); // browse all items, and process them one by one - taosResetQitems(qall); + taosResetQitems(pWorker->qall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, &type, &item); + taosGetQitem(pWorker->qall, &type, &item); if (type == TAOS_QTYPE_RPC) { pWrite = (SWriteMsg *)item; dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code); @@ -209,8 +220,6 @@ static void *dnodeProcessWriteQueue(void *param) { } } - taosFreeQall(qall); - return NULL; } @@ -221,8 +230,10 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { usleep(1000); sched_yield(); } else { + taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); pWorker->qset = NULL; + dTrace("write worker:%d is released", pWorker->workerId); pthread_exit(NULL); } } diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 0325e7f641..e291fdc551 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -284,7 +284,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { /* Function to do regular expression check */ int regex_match(const char *s, const char *reg, int cflags) { regex_t regex; - char msgbuf[100]; + char msgbuf[100] = {0}; /* Compile regular expression */ if (regcomp(®ex, reg, cflags) != 0) { diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index a7b7e8383b..81a41453e8 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -97,6 +97,8 @@ int main(int argc, char* argv[]) { /* Interupt handler. */ struct sigaction act; + memset(&act, 0, sizeof(struct sigaction)); + act.sa_handler = interruptHandler; sigaction(SIGTERM, &act, NULL); sigaction(SIGINT, &act, NULL); diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index a397f6f845..a4863ef61d 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -103,7 +103,8 @@ void rpcCloseConnCache(void *handle) { if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool); tfree(pCache->connHashList); - tfree(pCache->count) + tfree(pCache->count); + tfree(pCache->lockedBy); pthread_mutex_unlock(&pCache->mutex); diff --git a/src/rpc/src/rpcClient.c b/src/rpc/src/rpcClient.c index b362b1ba44..264449bbb0 100644 --- a/src/rpc/src/rpcClient.c +++ b/src/rpc/src/rpcClient.c @@ -84,7 +84,9 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) { + int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)); + pthread_attr_destroy(&thattr); + if (code != 0) { tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno)); return NULL; } diff --git a/src/rpc/src/rpcServer.c b/src/rpc/src/rpcServer.c index 1aadabc5f7..538b3059e3 100644 --- a/src/rpc/src/rpcServer.c +++ b/src/rpc/src/rpcServer.c @@ -83,6 +83,9 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, } memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads); + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pThreadObj = pServerObj->pThreadObj; for (i = 0; i < numOfThreads; ++i) { pThreadObj->processData = fp; @@ -105,8 +108,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, return NULL; } - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) { tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno)); return NULL; @@ -116,8 +117,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, pThreadObj++; } - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) { tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno)); return NULL; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 64a4df0e73..785288f5b9 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -146,10 +146,12 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v pConn->tmrCtrl = pSet->tmrCtrl; } - if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) { + int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); + if (code != 0) { tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); taosCloseSocket(pConn->fd); taosCleanUpUdpConnection(pSet); + pthread_attr_destroy(&thAttr); return NULL; } diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 9cad14e8c7..03e155845e 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -298,7 +298,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { */ static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize, uint32_t hashVal) { - size_t totalSize = dataSize + sizeof(SHashNode) + keyLen; + size_t totalSize = dataSize + sizeof(SHashNode) + keyLen + 1; // one extra byte for null SHashNode *pNewNode = calloc(1, totalSize); if (pNewNode == NULL) { diff --git a/src/util/src/ihash.c b/src/util/src/ihash.c index 30773ae8d9..2cfadad964 100644 --- a/src/util/src/ihash.c +++ b/src/util/src/ihash.c @@ -189,7 +189,6 @@ void taosCleanUpIntHash(void *handle) { free(pObj->hashList); } - memset(pObj, 0, sizeof(IHashObj)); free(pObj->lockedBy); free(pObj); } diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 2cf94267f8..b9f1141d35 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems); + pTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -297,14 +297,16 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand STaosQset *qset = (STaosQset *)param; STaosQnode *pNode = NULL; int code = 0; + + pthread_mutex_lock(&qset->mutex); for(int i=0; inumOfQueues; ++i) { - pthread_mutex_lock(&qset->mutex); + //pthread_mutex_lock(&qset->mutex); if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; if (queue) qset->current = queue->next; - pthread_mutex_unlock(&qset->mutex); + //pthread_mutex_unlock(&qset->mutex); if (queue == NULL) break; pthread_mutex_lock(&queue->mutex); @@ -326,6 +328,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand if (pNode) break; } + pthread_mutex_unlock(&qset->mutex); + return code; } @@ -335,13 +339,15 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { STaosQall *qall = (STaosQall *)p2; int code = 0; + pthread_mutex_lock(&qset->mutex); + for(int i=0; inumOfQueues; ++i) { - pthread_mutex_lock(&qset->mutex); + // pthread_mutex_lock(&qset->mutex); if (qset->current == NULL) qset->current = qset->head; queue = qset->current; if (queue) qset->current = queue->next; - pthread_mutex_unlock(&qset->mutex); + // pthread_mutex_unlock(&qset->mutex); if (queue == NULL) break; pthread_mutex_lock(&queue->mutex); @@ -365,6 +371,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { if (code != 0) break; } + pthread_mutex_unlock(&qset->mutex); return code; } diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 56d16eeb71..8608b64057 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -40,66 +40,68 @@ static void *taosProcessSchedQueue(void *param); static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { - pthread_attr_t attr; - SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue)); + SSchedQueue *pSched = (SSchedQueue *)calloc(sizeof(SSchedQueue), 1); if (pSched == NULL) { - pError("%s: no enough memory for pSched, reason: %s", label, strerror(errno)); - goto _error; + pError("%s: no enough memory for pSched", label); + return NULL; + } + + pSched->queue = (SSchedMsg *)calloc(sizeof(SSchedMsg), queueSize); + if (pSched->queue == NULL) { + pError("%s: no enough memory for queue", label); + taosCleanUpScheduler(pSched); + return NULL; + } + + pSched->qthread = calloc(sizeof(pthread_t), numOfThreads); + if (pSched->qthread == NULL) { + pError("%s: no enough memory for qthread", label); + taosCleanUpScheduler(pSched); + return NULL; } - memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; strncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow pSched->label[sizeof(pSched->label)-1] = '\0'; - if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { - pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { - pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if (tsem_init(&pSched->fullSem, 0, 0) != 0) { - pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { - pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0; pSched->emptySlot = 0; - pSched->qthread = malloc(sizeof(pthread_t) * (size_t)numOfThreads); - if (pSched->qthread == NULL) { - pError("%s: no enough memory for qthread, reason: %s", pSched->label, strerror(errno)); - goto _error; + if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { + pError("init %s:queueMutex failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; } - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { + pError("init %s:empty semaphore failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; + } + + if (tsem_init(&pSched->fullSem, 0, 0) != 0) { + pError("init %s:full semaphore failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; + } for (int i = 0; i < numOfThreads; ++i) { - if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { - pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno)); - goto _error; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + int code = pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched); + pthread_attr_destroy(&attr); + if (code != 0) { + pError("%s: failed to create rpc thread(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; } ++pSched->numOfThreads; } - pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads); + pTrace("%s scheduler is initialized, numOfThreads:%d", label, pSched->numOfThreads); return (void *)pSched; - -_error: - taosCleanUpScheduler(pSched); - return NULL; } void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) { @@ -124,21 +126,21 @@ void *taosProcessSchedQueue(void *param) { pTrace("wait %s fullSem was interrupted", pSched->label); continue; } - pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno)); + pError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); } if (pthread_mutex_lock(&pSched->queueMutex) != 0) - pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) - pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno)); + pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); if (tsem_post(&pSched->emptySem) != 0) - pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno)); + pError("post %s emptySem failed(%s)", pSched->label, strerror(errno)); if (msg.fp) (*(msg.fp))(&msg); @@ -158,22 +160,23 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { while (tsem_wait(&pSched->emptySem) != 0) { if (errno != EINTR) { - pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno)); + pError("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); break; } pTrace("wait %s emptySem was interrupted", pSched->label); } if (pthread_mutex_lock(&pSched->queueMutex) != 0) - pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) - pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); - if (tsem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno)); + if (tsem_post(&pSched->fullSem) != 0) + pError("post %s fullSem failed(%s)", pSched->label, strerror(errno)); return 0; } @@ -183,10 +186,12 @@ void taosCleanUpScheduler(void *param) { if (pSched == NULL) return; for (int i = 0; i < pSched->numOfThreads; ++i) { - pthread_cancel(pSched->qthread[i]); + if (pSched->qthread[i]) + pthread_cancel(pSched->qthread[i]); } for (int i = 0; i < pSched->numOfThreads; ++i) { - pthread_join(pSched->qthread[i], NULL); + if (pSched->qthread[i]) + pthread_join(pSched->qthread[i], NULL); } tsem_destroy(&pSched->emptySem); @@ -197,8 +202,8 @@ void taosCleanUpScheduler(void *param) { taosTmrStopA(&pSched->pTimer); } - free(pSched->queue); - free(pSched->qthread); + if (pSched->queue) free(pSched->queue); + if (pSched->qthread) free(pSched->qthread); free(pSched); // fix memory leak } diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index ea7a003d3d..4d77e007ad 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -224,10 +224,11 @@ void vnodeRelease(void *pVnodeRaw) { // remove the whole directory } - dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); free(pVnode); int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1); + dTrace("pVnode:%p vgId:%d, vnode is released, vnodes:%d", pVnode, vgId, count); + if (count <= 0) { taosCleanUpIntHash(tsDnodeVnodesHash); vnodeModuleInit = PTHREAD_ONCE_INIT; diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index 06f62ea6f7..077bdf45c3 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -222,8 +222,9 @@ int tsdbOpenFile(SFile *pFile, int oflag); int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); -#define TSDB_FGROUP_ITER_FORWARD 0 -#define TSDB_FGROUP_ITER_BACKWARD 1 +#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC +#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC + typedef struct { int numOfFGroups; SFileGroup *base; diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 8bdfe63002..d025144ba9 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -154,7 +154,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { } int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; - void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags); + void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), compFGroupKey, flags); if (ptr == NULL) { pIter->pFileGroup = NULL; } else { @@ -173,7 +173,7 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { pIter->pFileGroup += 1; } } else { - if (pIter->pFileGroup - 1 == pIter->base) { + if (pIter->pFileGroup == pIter->base) { pIter->pFileGroup = NULL; } else { pIter->pFileGroup -= 1; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index e5f0aed05e..2fce73e547 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -27,7 +27,7 @@ #define EXTRA_BYTES 2 #define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) #define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC) -#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns)) +#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns))) enum { QUERY_RANGE_LESS_EQUAL = 0, @@ -87,14 +87,8 @@ typedef struct STableBlockInfo { int32_t groupIdx; /* number of group is less than the total number of tables */ } STableBlockInfo; -enum { - SINGLE_TABLE_MODEL = 1, - MULTI_TABLE_MODEL = 2, -}; - typedef struct STsdbQueryHandle { STsdbRepo* pTsdb; - int8_t model; // access model, single table model or multi-table model SQueryFilePos cur; // current position SQueryFilePos start; // the start position, used for secondary/third iteration @@ -128,21 +122,6 @@ typedef struct STsdbQueryHandle { SCompIdx* compIndex; } STsdbQueryHandle; -int32_t doAllocateBuf(STsdbQueryHandle* pQueryHandle, int32_t rowsPerFileBlock) { - // record the maximum column width among columns of this meter/metric - SColumnInfoData* pColumn = taosArrayGet(pQueryHandle->pColumns, 0); - - int32_t maxColWidth = pColumn->info.bytes; - for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) { - int32_t bytes = pColumn[i].info.bytes; - if (bytes > maxColWidth) { - maxColWidth = bytes; - } - } - - return TSDB_CODE_SUCCESS; -} - static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; pBlockLoadInfo->sid = -1; @@ -161,9 +140,9 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond // todo 2. add the reference count for each table that is involved in query STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); - pQueryHandle->order = pCond->order; + pQueryHandle->order = pCond->order; pQueryHandle->window = pCond->twindow; - pQueryHandle->pTsdb = tsdb; + pQueryHandle->pTsdb = tsdb; pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)), pQueryHandle->loadDataAfterSeek = false; @@ -174,25 +153,33 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo)); for (int32_t i = 0; i < size; ++i) { - STableId id = *(STableId*)taosArrayGet(idList, i); - + STableId id = *(STableId*) taosArrayGet(idList, i); + + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid); + if (pTable == NULL) { + dError("%p failed to get table, error uid:%" PRIu64, pQueryHandle, id.uid); + continue; + } + STableCheckInfo info = { - .lastKey = pQueryHandle->window.skey, - .tableId = id, - .pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), // todo this may be failed - .pCompInfo = NULL, + .lastKey = pQueryHandle->window.skey, + .tableId = id, + .pTableObj = pTable, }; - assert(info.pTableObj != NULL); taosArrayPush(pQueryHandle->pTableCheckInfo, &info); } - pQueryHandle->model = (size > 1) ? MULTI_TABLE_MODEL : SINGLE_TABLE_MODEL; - pQueryHandle->checkFiles = 1; - + dTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); + + /* + * For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place + * in case of descending timestamp order query. + */ + pQueryHandle->checkFiles = QUERY_IS_ASC_QUERY(pQueryHandle->order); pQueryHandle->activeIndex = 0; - // malloc buffer in order to load data from file + // allocate buffer in order to load data blocks from file int32_t numOfCols = taosArrayGetSize(pColumnInfo); size_t bufferCapacity = 4096; @@ -206,10 +193,6 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond taosArrayPush(pQueryHandle->pColumns, &pDest); } - if (doAllocateBuf(pQueryHandle, bufferCapacity) != TSDB_CODE_SUCCESS) { - return NULL; - } - tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); @@ -239,7 +222,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { // todo dynamic get the daysperfile static int32_t getFileIdFromKey(TSKEY key) { - return (int32_t)(key / 10); // set the starting fileId + int64_t fid = (int64_t)(key / 10); // set the starting fileId + if (fid > INT32_MAX) { + fid = INT32_MAX; + } + + return fid; } static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order); @@ -247,8 +235,12 @@ static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) { // todo check open file failed SFileGroup* fileGroup = pQueryHandle->pFileGroup; + + assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); if (fileGroup->files[TSDB_FILE_TYPE_HEAD].fd == FD_INITIALIZER) { fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY); + } else { + assert(FD_VALID(fileGroup->files[TSDB_FILE_TYPE_HEAD].fd)); } // load all the comp offset value for all tables in this file @@ -262,7 +254,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid]; if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file - + assert(0); } else { if (pCheckInfo->compSize < compIndex->len) { assert(compIndex->len > 0); @@ -271,46 +263,35 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo assert(t != NULL); pCheckInfo->pCompInfo = (SCompInfo*) t; + pCheckInfo->compSize = compIndex->len; } tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); - - int32_t index = 0; + + SCompInfo* pCompInfo = pCheckInfo->pCompInfo; + + TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); + TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); + // discard the unqualified data block based on the query time window - int32_t start = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, compIndex->numOfSuperBlocks, - pQueryHandle->order, pCheckInfo->lastKey); - - if (type == QUERY_RANGE_GREATER_EQUAL) { - if (pCheckInfo->lastKey <= pCheckInfo->pCompInfo->blocks[start].keyLast) { - // break; - } else { - index = -1; - } - } else { - if (pCheckInfo->lastKey >= pCheckInfo->pCompInfo->blocks[start].keyFirst) { - // break; - } else { - index = -1; - } - } - - // not found in data blocks in current file - if (index == -1) { + int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfSuperBlocks, s, TSDB_ORDER_ASC); + int32_t end = start; + + if (s > pCompInfo->blocks[start].keyLast) { continue; } - // todo speedup the procedure of locating end block - int32_t e = start; - while (e < compIndex->numOfSuperBlocks && - (pCheckInfo->pCompInfo->blocks[e].keyFirst <= pQueryHandle->window.ekey)) { - e += 1; + // todo speedup the procedure of located end block + while (end < compIndex->numOfSuperBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { + end += 1; } + pCheckInfo->numOfBlocks = (end - start); + if (start > 0) { - memmove(pCheckInfo->pCompInfo->blocks, &pCheckInfo->pCompInfo->blocks[start], (e - start) * sizeof(SCompBlock)); + memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock)); } - pCheckInfo->numOfBlocks = (e - start); (*numOfBlocks) += pCheckInfo->numOfBlocks; } } @@ -413,7 +394,6 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock } SDataCols* pDataCols = pCheckInfo->pDataCols; - if (pCheckInfo->lastKey > pBlock->keyFirst) { cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); @@ -425,8 +405,24 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock } else { // the whole block is loaded in to buffer pQueryHandle->realNumOfRows = pBlock->numOfPoints; } - } else { // todo desc query + } else { + // query ended in current block if (pQueryHandle->window.ekey > pBlock->keyFirst) { + if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { + return false; + } + + SDataCols* pDataCols = pCheckInfo->pDataCols; + if (pCheckInfo->lastKey < pBlock->keyLast) { + cur->pos = + binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); + } else { + cur->pos = pBlock->numOfPoints - 1; + } + + filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); + } else { + pQueryHandle->realNumOfRows = pBlock->numOfPoints; } } @@ -619,7 +615,9 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf } static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) { - int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + assert(numOfCols <= TSDB_MAX_COLUMNS); + SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t)); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); @@ -923,8 +921,8 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { pQueryHandle->locateStart = true; int32_t fid = getFileIdFromKey(pQueryHandle->window.skey); - - tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, TSDB_FGROUP_ITER_FORWARD); + + tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); int32_t numOfBlocks = -1; @@ -934,13 +932,14 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { - if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) { + int32_t type = QUERY_IS_ASC_QUERY(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; + if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) { break; } assert(numOfBlocks >= 0); dTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks, - pQueryHandle->pFileGroup->fileId, numOfTables); + numOfTables, pQueryHandle->pFileGroup->fileId); // todo return error code to query engine if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) { @@ -961,7 +960,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { return false; } - cur->slot = 0; + cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->fid = pQueryHandle->pFileGroup->fileId; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; @@ -970,8 +969,10 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); } else { - if (cur->slot == pQueryHandle->numOfBlocks - 1) { // all blocks + if ((cur->slot == pQueryHandle->numOfBlocks - 1 && QUERY_IS_ASC_QUERY(pQueryHandle->order)) || + (cur->slot == 0 && !QUERY_IS_ASC_QUERY(pQueryHandle->order))) { // all blocks int32_t numOfBlocks = -1; + int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); pQueryHandle->numOfBlocks = 0; @@ -1001,67 +1002,84 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { cur->fid = -1; return false; } - - cur->slot = 0; + + cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->fid = pQueryHandle->pFileGroup->fileId; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; - STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; SCompBlock* pBlock = pBlockInfo->pBlock.compBlock; return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); } else { // next block of the same file - cur->slot += 1; - cur->pos = 0; - + int32_t step = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 1:-1; + cur->slot += step; + STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; + if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + cur->pos = 0; + } else { + cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1; + } + return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo); } } } -// handle data in cache situation -bool tsdbNextDataBlock(tsdb_query_handle_t* pQueryHandle) { - STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle; - - size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo); + +static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { + size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + // todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity assert(numOfTables > 0); - if (pHandle->checkFiles) { - if (getDataBlocksInFiles(pHandle)) { + while (pQueryHandle->activeIndex < numOfTables) { + if (hasMoreDataInCache(pQueryHandle)) { return true; } - pHandle->activeIndex = 0; - pHandle->checkFiles = 0; - - while (pHandle->activeIndex < numOfTables) { - if (hasMoreDataInCache(pHandle)) { - return true; - } - - pHandle->activeIndex += 1; - } - - return false; - } else { - while (pHandle->activeIndex < numOfTables) { - if (hasMoreDataInCache(pHandle)) { - return true; - } - - pHandle->activeIndex += 1; - } - - return false; + pQueryHandle->activeIndex += 1; } + + return false; +} + +// handle data in cache situation +bool tsdbNextDataBlock(tsdb_query_handle_t* pqHandle) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle; + + size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + assert(numOfTables > 0); + + if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + if (pQueryHandle->checkFiles) { + if (getDataBlocksInFiles(pQueryHandle)) { + return true; + } + + pQueryHandle->activeIndex = 0; + pQueryHandle->checkFiles = false; + } + + return doHasDataInBuffer(pQueryHandle); + } else { // starts from the buffer in case of descending timestamp order check data blocks + if (!pQueryHandle->checkFiles) { + if (doHasDataInBuffer(pQueryHandle)) { + return true; + } + + pQueryHandle->checkFiles = true; + } + + return getDataBlocksInFiles(pQueryHandle); + } + } static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey, - STsdbQueryHandle* pHandle) { + STsdbQueryHandle* pQueryHandle) { int numOfRows = 0; - int32_t numOfCols = taosArrayGetSize(pHandle->pColumns); + int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); *skey = INT64_MIN; while (tSkipListIterNext(pIter)) { @@ -1079,7 +1097,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max int32_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i); + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); offset += pColInfo->info.bytes; } diff --git a/tests/pytest/insert/__init__.py b/tests/pytest/insert/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/pytest/insert/basic.py b/tests/pytest/insert/basic.py new file mode 100644 index 0000000000..c6dbd76de4 --- /dev/null +++ b/tests/pytest/insert/basic.py @@ -0,0 +1,53 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * + +class TDTestCase: + def init(self, conn): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def run(self): + tdSql.prepare() + tdSql.execute('show databases') + tdSql.execute('drop database if exists db') + tdSql.execute('create database db') + tdSql.execute('use db') + tdSql.execute('create table tb (ts timestamp, speed int)') + + insertRows = 10 + tdLog.info("insert %d rows" % (insertRows)) + for i in range(0, insertRows): + tdSql.execute('insert into tb values (now + %dm, %d)' % (i, i)) + +# tdLog.info("insert earlier data") +# tdSql.execute('insert into tb values (now - 5m , 10)') +# tdSql.execute('insert into tb values (now - 6m , 10)') +# tdSql.execute('insert into tb values (now - 7m , 10)') +# tdSql.execute('insert into tb values (now - 8m , 10)') + +# tdSql.query("select * from tb") +# tdSql.checkRows(insertRows) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/simpletest.sh b/tests/pytest/simpletest.sh new file mode 100755 index 0000000000..aab36884f3 --- /dev/null +++ b/tests/pytest/simpletest.sh @@ -0,0 +1 @@ +sudo python2 ./test.py -f insert/basic.py diff --git a/tests/pytest/test.py b/tests/pytest/test.py new file mode 100644 index 0000000000..b88e444665 --- /dev/null +++ b/tests/pytest/test.py @@ -0,0 +1,87 @@ +#!/usr/bin/python +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### +# install pip +# pip install src/connector/python/linux/python2/ + +# -*- coding: utf-8 -*- +import sys +import getopt +from util.log import * +from util.dnodes import * +from util.cases import * + +import taos + +# add testcase here: +from insert.basic import * + +if __name__ == "__main__": + fileName = "all" + deployPath = "" + masterIp = "" + testCluster = False + opts, args = getopt.getopt(sys.argv[1:], 'f:p:m:sch', [ + 'file=', 'path=', 'master', 'stop', 'cluster', 'help']) + for key, value in opts: + if key in ['-h', '--help']: + tdLog.printNoPrefix( + 'A collection of test cases written using Python') + tdLog.printNoPrefix('-f Name of test case file written by Python') + tdLog.printNoPrefix('-p Deploy Path for Simulator') + tdLog.printNoPrefix('-m Master Ip for Simulator') + tdLog.printNoPrefix('-c Test Cluster Flag') + tdLog.printNoPrefix('-s stop All dnodes') + sys.exit(0) + if key in ['-f', '--file']: + fileName = value + if key in ['-p', '--path']: + deployPath = value + if key in ['-m', '--master']: + masterIp = value + if key in ['-c', '--cluster']: + testCluster = True + if key in ['-s', '--stop']: + cmd = "ps -ef|grep -w taosd | grep 'taosd' | grep -v grep | awk '{print $2}' && pkill -9 taosd" + os.system(cmd) + tdLog.exit('stop All dnodes') + + if masterIp == "": + tdDnodes.init(deployPath) + if testCluster: + tdLog.notice("Procedures for testing cluster") + if fileName == "all": + tdCases.runAllCluster() + else: + tdCases.runOneCluster(fileName) + else: + tdLog.notice("Procedures for testing self-deployment") + tdDnodes.stopAll() + tdDnodes.deploy(1) + tdDnodes.start(1) + conn = taos.connect( + host='192.168.0.1', + config=tdDnodes.getSimCfgPath()) + if fileName == "all": + tdCases.runAllLinux(conn) + else: + tdLog.info("CBD LN78: %s" % (fileName)) + tdCases.runOneLinux(conn, fileName) + conn.close() + else: + tdLog.notice("Procedures for tdengine deployed in %s" % (masterIp)) + conn = taos.connect(host=masterIp, config=tdDnodes.getSimCfgPath()) + if fileName == "all": + tdCases.runAllWindows(conn) + else: + tdCases.runOneWindows(conn, fileName) + conn.close() + diff --git a/tests/pytest/util/__init__.py b/tests/pytest/util/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/pytest/util/cases.py b/tests/pytest/util/cases.py new file mode 100644 index 0000000000..320c9d974f --- /dev/null +++ b/tests/pytest/util/cases.py @@ -0,0 +1,103 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import time +import datetime +from util.log import * + + +class TDCase: + def __init__(self, name, case): + self.name = name + self.case = case + + +class TDCases: + def __init__(self): + self.linuxCases = [] + self.windowsCases = [] + self.clusterCases = [] + + def addWindows(self, name, case): + self.windowsCases.append(TDCase(name, case)) + + def addLinux(self, name, case): + self.linuxCases.append(TDCase(name, case)) + + def addCluster(self, name, case): + self.clusterCases.append(TDCase(name, case)) + + def runAllLinux(self, conn): + tdLog.notice("run total %d cases" % (len(self.linuxCases))) + for case in self.linuxCases: + case.case.init(conn) + case.case.run() + case.case.stop() + tdLog.notice("total %d cases executed" % (len(self.linuxCases))) + + def runOneLinux(self, conn, fileName): + tdLog.notice("run cases like %s" % (fileName)) + runNum = 0 + for case in self.linuxCases: + if case.name.find(fileName) != -1: + case.case.init(conn) + case.case.run() + case.case.stop() + time.sleep(5) + runNum += 1 + tdLog.notice("total %d cases executed" % (runNum)) + + def runAllWindows(self, conn): + tdLog.notice("run total %d cases" % (len(self.windowsCases))) + for case in self.windowsCases: + case.case.init(conn) + case.case.run() + case.case.stop() + tdLog.notice("total %d cases executed" % (len(self.windowsCases))) + + def runOneWindows(self, conn, fileName): + tdLog.notice("run cases like %s" % (fileName)) + runNum = 0 + for case in self.windowsCases: + if case.name.find(fileName) != -1: + case.case.init(conn) + case.case.run() + case.case.stop() + time.sleep(2) + runNum += 1 + tdLog.notice("total %d cases executed" % (runNum)) + + def runAllCluster(self): + tdLog.notice("run total %d cases" % (len(self.clusterCases))) + for case in self.clusterCases: + case.case.init() + case.case.run() + case.case.stop() + tdLog.notice("total %d cases executed" % (len(self.clusterCases))) + + def runOneCluster(self, fileName): + tdLog.notice("run cases like %s" % (fileName)) + runNum = 0 + for case in self.clusterCases: + if case.name.find(fileName) != -1: + case.case.init() + case.case.run() + case.case.stop() + time.sleep(2) + runNum += 1 + tdLog.notice("total %d cases executed" % (runNum)) + + +tdCases = TDCases() diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py new file mode 100644 index 0000000000..2be4f94802 --- /dev/null +++ b/tests/pytest/util/dnodes.py @@ -0,0 +1,332 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import os.path +from util.log import * + + +class TDSimClient: + def init(self, path): + self.path = path + + def getCfgDir(self): + return self.cfgDir + + def cfg(self, option, value): + cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def deploy(self): + self.logDir = "%s/sim/psim/log" % (self.path,) + self.cfgDir = "%s/sim/psim/cfg" % (self.path) + self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path) + + cmd = "rm -rf " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "touch " + self.cfgPath + if os.system(cmd) != 0: + tdLog.exit(cmd) + + self.cfg("masterIp", "192.168.0.1") + self.cfg("secondIp", "192.168.0.2") + self.cfg("logDir", self.logDir) + self.cfg("numOfLogLines", "100000000") + self.cfg("numOfThreadsPerCore", "2.0") + self.cfg("locale", "en_US.UTF-8") + self.cfg("charset", "GBK") + self.cfg("asyncLog", "0") + self.cfg("anyIp", "0") + self.cfg("sdbDebugFlag", "135") + self.cfg("rpcDebugFlag", "135") + self.cfg("tmrDebugFlag", "131") + self.cfg("cDebugFlag", "135") + self.cfg("udebugFlag", "135") + self.cfg("jnidebugFlag", "135") + self.cfg("qdebugFlag", "135") + tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath)) + + +class TDDnode: + def __init__(self, index): + self.index = index + self.running = 0 + self.deployed = 0 + + def init(self, path): + self.path = path + + def deploy(self): + self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index) + self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index) + self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index) + self.cfgPath = "%s/sim/dnode%d/cfg/taos.cfg" % (self.path, self.index) + + cmd = "rm -rf " + self.dataDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.dataDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "touch " + self.cfgPath + if os.system(cmd) != 0: + tdLog.exit(cmd) + + self.startIP() + self.cfg("masterIp", "192.168.0.1") + self.cfg("secondIp", "192.168.0.2") + self.cfg("publicIp", "192.168.0.%d" % (self.index)) + self.cfg("internalIp", "192.168.0.%d" % (self.index)) + self.cfg("privateIp", "192.168.0.%d" % (self.index)) + self.cfg("dataDir", self.dataDir) + self.cfg("logDir", self.logDir) + self.cfg("numOfLogLines", "100000000") + self.cfg("mgmtEqualVnodeNum", "0") + self.cfg("clog", "1") + self.cfg("statusInterval", "1") + self.cfg("numOfTotalVnodes", "64") + self.cfg("numOfMPeers", "3") + self.cfg("numOfThreadsPerCore", "2.0") + self.cfg("monitor", "0") + self.cfg("maxVnodeConnections", "30000") + self.cfg("maxMgmtConnections", "30000") + self.cfg("maxMeterConnections", "30000") + self.cfg("maxShellConns", "30000") + self.cfg("locale", "en_US.UTF-8") + self.cfg("charset", "UTF-8") + self.cfg("asyncLog", "0") + self.cfg("anyIp", "0") + self.cfg("dDebugFlag", "135") + self.cfg("mDebugFlag", "135") + self.cfg("sdbDebugFlag", "135") + self.cfg("rpcDebugFlag", "135") + self.cfg("tmrDebugFlag", "131") + self.cfg("cDebugFlag", "135") + self.cfg("httpDebugFlag", "135") + self.cfg("monitorDebugFlag", "135") + self.cfg("udebugFlag", "135") + self.cfg("jnidebugFlag", "135") + self.cfg("qdebugFlag", "135") + self.deployed = 1 + tdLog.debug( + "dnode:%d is deployed and configured by %s" % + (self.index, self.cfgPath)) + + def start(self): + binPath = os.path.dirname(os.path.realpath(__file__)) + binPath = binPath + "/../../../debug/" + binPath = os.path.realpath(binPath) + binPath += "/build/bin/" + + if self.deployed == 0: + tdLog.exit("dnode:%d is not deployed" % (self.index)) + cmd = "nohup %staosd -c %s > /dev/null 2>&1 & " % ( + binPath, self.cfgDir) + print(cmd) + if os.system(cmd) != 0: + tdLog.exit(cmd) + self.running = 1 + tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) + + tdLog.debug("wait 2 seconds for the dnode:%d to start." % (self.index)) + time.sleep(2) + + def stop(self): + if self.running != 0: + cmd = "ps -ef|grep -w taosd | grep '%s' | grep -v grep | awk '{print $2}' && pkill -sigint taosd" % ( + self.cfgDir) + if os.system(cmd) != 0: + tdLog.exit(cmd) + tdLog.debug("dnode:%d is stopped by kill -SIGINT" % (self.index)) + tdLog.debug( + "wait 2 seconds for the dnode:%d to stop." % + (self.index)) + time.sleep(2) + + def forcestop(self): + if self.running != 0: + cmd = "ps -ef|grep -w taosd | grep '%s' | grep -v grep | awk '{print $2}' && pkill -sigkill taosd" % ( + self.cfgDir) + if os.system(cmd) != 0: + tdLog.exit(cmd) + tdLog.debug("dnode:%d is stopped by kill -9" % (self.index)) + tdLog.debug( + "wait 2 seconds for the dnode:%d to stop." % + (self.index)) + time.sleep(2) + + def startIP(self): + cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def stopIP(self): + cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % ( + self.index, self.index) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def cfg(self, option, value): + cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def getDnodeRootDir(self, index): + dnodeRootDir = "%s/sim/psim/dnode%d" % (self.path, index) + return dnodeRootDir + + def getDnodesRootDir(self): + dnodesRootDir = "%s/sim/psim" % (self.path) + return dnodesRootDir + + +class TDDnodes: + def __init__(self): + self.dnodes = [] + self.dnodes.append(TDDnode(1)) + self.dnodes.append(TDDnode(2)) + self.dnodes.append(TDDnode(3)) + self.dnodes.append(TDDnode(4)) + self.dnodes.append(TDDnode(5)) + self.dnodes.append(TDDnode(6)) + self.dnodes.append(TDDnode(7)) + self.dnodes.append(TDDnode(8)) + self.dnodes.append(TDDnode(9)) + self.dnodes.append(TDDnode(10)) + + def init(self, path): + cmd = "ps -ef|grep -w taosd | grep 'taosd' | grep -v grep | awk '{print $2}' && pkill -sigkill taosd" + os.system(cmd) + + binPath = os.path.dirname(os.path.realpath(__file__)) + binPath = binPath + "/../../../debug/" + tdLog.debug("binPath %s" % (binPath)) + binPath = os.path.realpath(binPath) + tdLog.debug("binPath real path %s" % (binPath)) + + # cmd = "sudo cp %s/build/lib/libtaos.so /usr/local/lib/taos/" % (binPath) + # tdLog.debug(cmd) + # os.system(cmd) + + # cmd = "sudo cp %s/build/bin/taos /usr/local/bin/taos/" % (binPath) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + # tdLog.debug("execute %s" % (cmd)) + + # cmd = "sudo cp %s/build/bin/taosd /usr/local/bin/taos/" % (binPath) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + # tdLog.debug("execute %s" % (cmd)) + + if path == "": + # self.path = os.path.expanduser('~') + self.path = os.path.abspath(binPath + "../../") + else: + self.path = os.path.realpath(path) + + for i in range(len(self.dnodes)): + self.dnodes[i].init(self.path) + + self.sim = TDSimClient() + self.sim.init(self.path) + self.sim.deploy() + + def deploy(self, index): + self.check(index) + self.dnodes[index - 1].deploy() + + def cfg(self, index, option, value): + self.check(index) + self.dnodes[index - 1].cfg(option, value) + + def start(self, index): + self.check(index) + self.dnodes[index - 1].start() + + def stop(self, index): + self.check(index) + self.dnodes[index - 1].stop() + + def forcestop(self, index): + self.check(index) + self.dnodes[index - 1].forcestop() + + def startIP(self, index): + self.check(index) + self.dnodes[index - 1].startIP() + + def stopIP(self, index): + self.check(index) + self.dnodes[index - 1].stopIP() + + def check(self, index): + if index < 1 or index > 10: + tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) + + def stopAll(self): + tdLog.debug("stop all dnodes") + for i in range(len(self.dnodes)): + self.dnodes[i].stop() + + cmd = "sudo systemctl stop taosd" + os.system(cmd) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + cmd = "ps -ef | grep -w taosd | grep 'dnode' | grep -v grep | awk '{print $2}' && sudo pkill -sigkill taosd" + os.system(cmd) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + + def getDnodesRootDir(self): + dnodesRootDir = "%s/sim" % (self.path) + return dnodesRootDir + + def getSimCfgPath(self): + return self.sim.getCfgDir() + + +tdDnodes = TDDnodes() diff --git a/tests/pytest/util/log.py b/tests/pytest/util/log.py new file mode 100644 index 0000000000..926e582448 --- /dev/null +++ b/tests/pytest/util/log.py @@ -0,0 +1,48 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import time +import datetime + + +class TDLog: + def __init__(self): + self.path = "" + + def info(self, info): + print "%s %s" % (datetime.datetime.now(), info) + + def sleep(self, sec): + print "%s sleep %d seconds" % (datetime.datetime.now(), sec) + time.sleep(sec) + + def debug(self, err): + print "\033[1;36m%s %s\033[0m" % (datetime.datetime.now(), err) + + def success(self, info): + print "\033[1;32m%s %s\033[0m" % (datetime.datetime.now(), info) + + def notice(self, err): + print "\033[1;33m%s %s\033[0m" % (datetime.datetime.now(), err) + + def exit(self, err): + print "\033[1;31m%s %s\033[0m" % (datetime.datetime.now(), err) + sys.exit(1) + + def printNoPrefix(self, info): + print "\033[1;36m%s\033[0m" % (info) + + +tdLog = TDLog() diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py new file mode 100644 index 0000000000..b4ac845bc8 --- /dev/null +++ b/tests/pytest/util/sql.py @@ -0,0 +1,135 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import time +import datetime +from util.log import * + + +class TDSql: + def __init__(self): + self.queryRows = 0 + self.queryCols = 0 + self.affectedRows = 0 + + def init(self, cursor): + self.cursor = cursor + + def close(self): + self.cursor.close() + + def prepare(self): + tdLog.info("prepare database:db") + self.cursor.execute('reset query cache') + self.cursor.execute('drop database if exists db') + self.cursor.execute('create database db') + self.cursor.execute('use db') + + def error(self, sql): + expectErrNotOccured = True + try: + self.cursor.execute(sql) + except BaseException: + expectErrNotOccured = False + if expectErrNotOccured: + tdLog.exit("sql:%.40s, expect error not occured" % (sql)) + else: + tdLog.info("sql:%.40s, expect error occured" % (sql)) + + def query(self, sql): + self.sql = sql + self.cursor.execute(sql) + self.queryResult = self.cursor.fetchall() + self.queryRows = len(self.queryResult) + self.queryCols = len(self.cursor.description) + # if self.queryRows == 1 and self.queryCols == 1: + # tdLog.info("sql:%s, rows:%d cols:%d data:%s" % (self.sql, self.queryRows, self.queryCols, self.queryResult[0][0])) + # else: + # tdLog.info("sql:%s, rows:%d cols:%d" % (self.sql, self.queryRows, self.queryCols)) + return self.queryRows + + def checkRows(self, expectRows): + if self.queryRows != expectRows: + tdLog.exit( + "sql:%.40s, queryRows:%d != expect:%d" % + (self.sql, self.queryRows, expectRows)) + tdLog.info("sql:%.40s, queryRows:%d == expect:%d" % + (self.sql, self.queryRows, expectRows)) + + def checkData(self, row, col, data): + if row < 0: + tdLog.exit( + "sql:%.40s, row:%d is smaller than zero" % + (self.sql, row)) + if col < 0: + tdLog.exit( + "sql:%.40s, col:%d is smaller than zero" % + (self.sql, col)) + if row >= self.queryRows: + tdLog.exit( + "sql:%.40s, row:%d is larger than queryRows:%d" % + (self.sql, row, self.queryRows)) + if col >= self.queryCols: + tdLog.exit( + "sql:%.40s, col:%d is larger than queryRows:%d" % + (self.sql, col, self.queryCols)) + if self.queryResult[row][col] != data: + tdLog.exit( + "sql:%.40s row:%d col:%d data:%s != expect:%s" % + (self.sql, row, col, self.queryResult[row][col], data)) + tdLog.info("sql:%.40s, row:%d col:%d data:%s == expect:%d" % + (self.sql, row, col, self.queryResult[row][col], data)) + + def getData(self, row, col): + if row < 0: + tdLog.exit( + "sql:%.40s, row:%d is smaller than zero" % + (self.sql, row)) + if col < 0: + tdLog.exit( + "sql:%.40s, col:%d is smaller than zero" % + (self.sql, col)) + if row >= self.queryRows: + tdLog.exit( + "sql:%.40s, row:%d is larger than queryRows:%d" % + (self.sql, row, self.queryRows)) + if col >= self.queryCols: + tdLog.exit( + "sql:%.40s, col:%d is larger than queryRows:%d" % + (self.sql, col, self.queryCols)) + return self.queryResult[row][col] + + def executeTimes(self, sql, times): + for i in range(times): + try: + return self.cursor.execute(sql) + except BaseException: + time.sleep(1) + continue + + def execute(self, sql): + self.sql = sql + self.affectedRows = self.cursor.execute(sql) + return self.affectedRows + + def checkAffectedRows(self, expectAffectedRows): + if self.affectedRows != expectAffectedRows: + tdLog.exit("sql:%.40s, affectedRows:%d != expect:%d" % + (self.sql, self.affectedRows, expectAffectedRows)) + tdLog.info("sql:%.40s, affectedRows:%d == expect:%d" % + (self.sql, self.affectedRows, expectAffectedRows)) + + +tdSql = TDSql()