Merge branch 'develop' into feature/mpeer
This commit is contained in:
commit
34d84adf04
|
@ -19,7 +19,6 @@ tests/test/
|
|||
tests/taoshebei/
|
||||
tests/taoscsv/
|
||||
tests/taosdalipu/
|
||||
tests/pytest/
|
||||
tests/jenkins/
|
||||
tests/hdfs/
|
||||
*.iml
|
||||
|
|
46
.travis.yml
46
.travis.yml
|
@ -14,6 +14,15 @@ os:
|
|||
- linux
|
||||
# - osx
|
||||
|
||||
before_install:
|
||||
- |-
|
||||
case $TRAVIS_OS_NAME in
|
||||
linux)
|
||||
sudo apt -y update
|
||||
sudo apt -y install python-pip python3-pip python-setuptools python3-setuptools
|
||||
;;
|
||||
esac
|
||||
|
||||
addons:
|
||||
coverity_scan:
|
||||
|
||||
|
@ -50,6 +59,15 @@ script:
|
|||
- |-
|
||||
case $TRAVIS_OS_NAME in
|
||||
linux)
|
||||
# Color setting
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[1;32m'
|
||||
GREEN_DARK='\033[0;32m'
|
||||
GREEN_UNDERLINE='\033[4;32m'
|
||||
NC='\033[0m'
|
||||
|
||||
sudo make install
|
||||
|
||||
cd ../tests/script
|
||||
sudo ./test.sh 2>&1 | grep 'success\|failed' | tee out.txt
|
||||
|
||||
|
@ -57,16 +75,32 @@ script:
|
|||
|
||||
if [ "$total_success" -gt "0" ]; then
|
||||
total_success=`expr $total_success - 1`
|
||||
echo -e "${GREEN} ### Total $total_success TSIM case(s) succeed! ### ${NC}"
|
||||
fi
|
||||
|
||||
echo "Total $total_success success"
|
||||
|
||||
total_failed=`grep failed out.txt | wc -l`
|
||||
echo "Total $total_failed failed"
|
||||
|
||||
if [ "$total_failed" -ne "0" ]; then
|
||||
echo -e "${RED} ### Total $total_failed TSIM case(s) failed! ### ${NC}"
|
||||
exit $total_failed
|
||||
fi
|
||||
|
||||
pip install --user ../../src/connector/python/linux/python2/
|
||||
pip3 install --user ../../src/connector/python/linux/python3/
|
||||
|
||||
cd ../pytest
|
||||
sudo ./simpletest.sh 2>&1 | grep 'successfully executed\|failed' | tee pytest-out.txt
|
||||
total_py_success=`grep 'successfully executed' pytest-out.txt | wc -l`
|
||||
|
||||
if [ "$total_py_success" -gt "0" ]; then
|
||||
echo -e "${GREEN} ### Total $total_py_success python case(s) succeed! ### ${NC}"
|
||||
fi
|
||||
|
||||
total_py_failed=`grep 'failed' pytest-out.txt | wc -l`
|
||||
if [ "$total_py_failed" -ne "0" ]; then
|
||||
echo -e "${RED} ### Total $total_py_failed python case(s) failed! ### ${NC}"
|
||||
exit $total_py_failed
|
||||
fi
|
||||
|
||||
;;
|
||||
esac
|
||||
|
||||
|
@ -81,6 +115,10 @@ matrix:
|
|||
- build-essential
|
||||
- cmake
|
||||
- net-tools
|
||||
- python-pip
|
||||
- python-setuptools
|
||||
- python3-pip
|
||||
- python3-setuptools
|
||||
|
||||
# - os: osx
|
||||
# addons:
|
||||
|
|
|
@ -698,6 +698,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
}
|
||||
|
||||
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
||||
#if 0
|
||||
SSqlObj *pSql = (SSqlObj *)res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
@ -737,6 +738,10 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
|||
}
|
||||
|
||||
return nRows;
|
||||
#endif
|
||||
|
||||
(*rows) = taos_fetch_row(res);
|
||||
return ((*rows) != NULL)? 1:0;
|
||||
}
|
||||
|
||||
int taos_select_db(TAOS *taos, const char *db) {
|
||||
|
|
|
@ -32,27 +32,51 @@ typedef struct {
|
|||
SRpcMsg rpcMsg;
|
||||
} SReadMsg;
|
||||
|
||||
typedef struct {
|
||||
pthread_t thread; // thread
|
||||
int32_t workerId; // worker ID
|
||||
} SReadWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t max; // max number of workers
|
||||
int32_t min; // min number of workers
|
||||
int32_t num; // current number of workers
|
||||
SReadWorker *readWorker;
|
||||
} SReadWorkerPool;
|
||||
|
||||
static void *dnodeProcessReadQueue(void *param);
|
||||
static void dnodeHandleIdleReadWorker();
|
||||
static void dnodeHandleIdleReadWorker(SReadWorker *);
|
||||
|
||||
// module global variable
|
||||
static taos_qset readQset;
|
||||
static int32_t threads; // number of query threads
|
||||
static int32_t maxThreads;
|
||||
static int32_t minThreads;
|
||||
static SReadWorkerPool readPool;
|
||||
static taos_qset readQset;
|
||||
|
||||
int32_t dnodeInitRead() {
|
||||
readQset = taosOpenQset();
|
||||
|
||||
minThreads = 3;
|
||||
maxThreads = tsNumOfCores * tsNumOfThreadsPerCore;
|
||||
if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads;
|
||||
readPool.min = 2;
|
||||
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore;
|
||||
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min;
|
||||
readPool.readWorker = (SReadWorker *) calloc(sizeof(SReadWorker), readPool.max);
|
||||
|
||||
if (readPool.readWorker == NULL) return -1;
|
||||
for (int i=0; i < readPool.max; ++i) {
|
||||
SReadWorker *pWorker = readPool.readWorker + i;
|
||||
pWorker->workerId = i;
|
||||
}
|
||||
|
||||
dPrint("dnode read is opened");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanupRead() {
|
||||
|
||||
for (int i=0; i < readPool.max; ++i) {
|
||||
SReadWorker *pWorker = readPool.readWorker + i;
|
||||
if (pWorker->thread)
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
|
||||
taosCloseQset(readQset);
|
||||
dPrint("dnode read is closed");
|
||||
}
|
||||
|
@ -116,18 +140,25 @@ void *dnodeAllocateRqueue(void *pVnode) {
|
|||
taosAddIntoQset(readQset, queue, pVnode);
|
||||
|
||||
// spawn a thread to process queue
|
||||
if (threads < maxThreads) {
|
||||
pthread_t thread;
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (readPool.num < readPool.max) {
|
||||
do {
|
||||
SReadWorker *pWorker = readPool.readWorker + readPool.num;
|
||||
|
||||
if (pthread_create(&thread, &thAttr, dnodeProcessReadQueue, readQset) != 0) {
|
||||
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||
}
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) {
|
||||
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
readPool.num++;
|
||||
dTrace("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num);
|
||||
} while (readPool.num < readPool.min);
|
||||
}
|
||||
|
||||
dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue);
|
||||
dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
@ -167,14 +198,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
|
|||
}
|
||||
|
||||
static void *dnodeProcessReadQueue(void *param) {
|
||||
taos_qset qset = (taos_qset)param;
|
||||
SReadMsg *pReadMsg;
|
||||
int type;
|
||||
void *pVnode;
|
||||
SReadWorker *pWorker = param;
|
||||
SReadMsg *pReadMsg;
|
||||
int type;
|
||||
void *pVnode;
|
||||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(qset, &type, (void **)&pReadMsg, (void **)&pVnode) == 0) {
|
||||
dnodeHandleIdleReadWorker();
|
||||
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
|
||||
dnodeHandleIdleReadWorker(pWorker);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -186,11 +217,12 @@ static void *dnodeProcessReadQueue(void *param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void dnodeHandleIdleReadWorker() {
|
||||
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
|
||||
int32_t num = taosGetQueueNumber(readQset);
|
||||
|
||||
if (num == 0 || (num <= minThreads && threads > minThreads)) {
|
||||
threads--;
|
||||
if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) {
|
||||
readPool.num--;
|
||||
dTrace("read worker:%d is released, total:%d", pWorker->workerId, readPool.num);
|
||||
pthread_exit(NULL);
|
||||
} else {
|
||||
usleep(100);
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "vnode.h"
|
||||
|
||||
typedef struct {
|
||||
taos_qall qall;
|
||||
taos_qset qset; // queue set
|
||||
pthread_t thread; // thread
|
||||
int32_t workerId; // worker ID
|
||||
|
@ -65,6 +66,14 @@ int32_t dnodeInitWrite() {
|
|||
}
|
||||
|
||||
void dnodeCleanupWrite() {
|
||||
|
||||
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
free(wWorkerPool.writeWorker);
|
||||
dPrint("dnode write is closed");
|
||||
}
|
||||
|
@ -113,6 +122,7 @@ void *dnodeAllocateWqueue(void *pVnode) {
|
|||
if (pWorker->qset == NULL) return NULL;
|
||||
|
||||
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
||||
pWorker->qall = taosAllocateQall();
|
||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
|
@ -122,13 +132,17 @@ void *dnodeAllocateWqueue(void *pVnode) {
|
|||
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
|
||||
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||
taosCloseQset(pWorker->qset);
|
||||
} else {
|
||||
dTrace("write worker:%d is launched", pWorker->workerId);
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
} else {
|
||||
taosAddIntoQset(pWorker->qset, queue, pVnode);
|
||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||
}
|
||||
|
||||
dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue);
|
||||
dTrace("pVnode:%p, write queue:%p is allocated", pVnode, queue);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
@ -160,17 +174,14 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) {
|
|||
|
||||
static void *dnodeProcessWriteQueue(void *param) {
|
||||
SWriteWorker *pWorker = (SWriteWorker *)param;
|
||||
taos_qall qall;
|
||||
SWriteMsg *pWrite;
|
||||
SWalHead *pHead;
|
||||
int32_t numOfMsgs;
|
||||
int type;
|
||||
void *pVnode, *item;
|
||||
|
||||
qall = taosAllocateQall();
|
||||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode);
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
|
||||
if (numOfMsgs <=0) {
|
||||
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
|
||||
continue;
|
||||
|
@ -178,7 +189,7 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
pWrite = NULL;
|
||||
taosGetQitem(qall, &type, &item);
|
||||
taosGetQitem(pWorker->qall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pWrite = (SWriteMsg *)item;
|
||||
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
|
||||
|
@ -196,9 +207,9 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
walFsync(vnodeGetWal(pVnode));
|
||||
|
||||
// browse all items, and process them one by one
|
||||
taosResetQitems(qall);
|
||||
taosResetQitems(pWorker->qall);
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, &type, &item);
|
||||
taosGetQitem(pWorker->qall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pWrite = (SWriteMsg *)item;
|
||||
dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code);
|
||||
|
@ -209,8 +220,6 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
}
|
||||
}
|
||||
|
||||
taosFreeQall(qall);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -221,8 +230,10 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
|||
usleep(1000);
|
||||
sched_yield();
|
||||
} else {
|
||||
taosFreeQall(pWorker->qall);
|
||||
taosCloseQset(pWorker->qset);
|
||||
pWorker->qset = NULL;
|
||||
dTrace("write worker:%d is released", pWorker->workerId);
|
||||
pthread_exit(NULL);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -284,7 +284,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
|
|||
/* Function to do regular expression check */
|
||||
int regex_match(const char *s, const char *reg, int cflags) {
|
||||
regex_t regex;
|
||||
char msgbuf[100];
|
||||
char msgbuf[100] = {0};
|
||||
|
||||
/* Compile regular expression */
|
||||
if (regcomp(®ex, reg, cflags) != 0) {
|
||||
|
|
|
@ -97,6 +97,8 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
/* Interupt handler. */
|
||||
struct sigaction act;
|
||||
memset(&act, 0, sizeof(struct sigaction));
|
||||
|
||||
act.sa_handler = interruptHandler;
|
||||
sigaction(SIGTERM, &act, NULL);
|
||||
sigaction(SIGINT, &act, NULL);
|
||||
|
|
|
@ -103,7 +103,8 @@ void rpcCloseConnCache(void *handle) {
|
|||
if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool);
|
||||
|
||||
tfree(pCache->connHashList);
|
||||
tfree(pCache->count)
|
||||
tfree(pCache->count);
|
||||
tfree(pCache->lockedBy);
|
||||
|
||||
pthread_mutex_unlock(&pCache->mutex);
|
||||
|
||||
|
|
|
@ -84,7 +84,9 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
|
|||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) {
|
||||
int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp));
|
||||
pthread_attr_destroy(&thattr);
|
||||
if (code != 0) {
|
||||
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -83,6 +83,9 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
|||
}
|
||||
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
pThreadObj = pServerObj->pThreadObj;
|
||||
for (i = 0; i < numOfThreads; ++i) {
|
||||
pThreadObj->processData = fp;
|
||||
|
@ -105,8 +108,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) {
|
||||
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno));
|
||||
return NULL;
|
||||
|
@ -116,8 +117,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
|||
pThreadObj++;
|
||||
}
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) {
|
||||
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno));
|
||||
return NULL;
|
||||
|
|
|
@ -146,10 +146,12 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
|
|||
pConn->tmrCtrl = pSet->tmrCtrl;
|
||||
}
|
||||
|
||||
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
|
||||
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
||||
if (code != 0) {
|
||||
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
||||
taosCloseSocket(pConn->fd);
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
pthread_attr_destroy(&thAttr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -298,7 +298,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
|
|||
*/
|
||||
static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize,
|
||||
uint32_t hashVal) {
|
||||
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen;
|
||||
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen + 1; // one extra byte for null
|
||||
|
||||
SHashNode *pNewNode = calloc(1, totalSize);
|
||||
if (pNewNode == NULL) {
|
||||
|
|
|
@ -189,7 +189,6 @@ void taosCleanUpIntHash(void *handle) {
|
|||
free(pObj->hashList);
|
||||
}
|
||||
|
||||
memset(pObj, 0, sizeof(IHashObj));
|
||||
free(pObj->lockedBy);
|
||||
free(pObj);
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
|
|||
queue->numOfItems++;
|
||||
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
|
||||
|
||||
pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems);
|
||||
pTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems);
|
||||
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
|
||||
|
@ -297,14 +297,16 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
|||
STaosQset *qset = (STaosQset *)param;
|
||||
STaosQnode *pNode = NULL;
|
||||
int code = 0;
|
||||
|
||||
pthread_mutex_lock(&qset->mutex);
|
||||
|
||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||
pthread_mutex_lock(&qset->mutex);
|
||||
//pthread_mutex_lock(&qset->mutex);
|
||||
if (qset->current == NULL)
|
||||
qset->current = qset->head;
|
||||
STaosQueue *queue = qset->current;
|
||||
if (queue) qset->current = queue->next;
|
||||
pthread_mutex_unlock(&qset->mutex);
|
||||
//pthread_mutex_unlock(&qset->mutex);
|
||||
if (queue == NULL) break;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
@ -326,6 +328,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
|||
if (pNode) break;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&qset->mutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -335,13 +339,15 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
|
|||
STaosQall *qall = (STaosQall *)p2;
|
||||
int code = 0;
|
||||
|
||||
pthread_mutex_lock(&qset->mutex);
|
||||
|
||||
for(int i=0; i<qset->numOfQueues; ++i) {
|
||||
pthread_mutex_lock(&qset->mutex);
|
||||
// pthread_mutex_lock(&qset->mutex);
|
||||
if (qset->current == NULL)
|
||||
qset->current = qset->head;
|
||||
queue = qset->current;
|
||||
if (queue) qset->current = queue->next;
|
||||
pthread_mutex_unlock(&qset->mutex);
|
||||
// pthread_mutex_unlock(&qset->mutex);
|
||||
if (queue == NULL) break;
|
||||
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
|
@ -365,6 +371,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
|
|||
if (code != 0) break;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&qset->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -40,66 +40,68 @@ static void *taosProcessSchedQueue(void *param);
|
|||
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
|
||||
|
||||
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
|
||||
pthread_attr_t attr;
|
||||
SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
|
||||
SSchedQueue *pSched = (SSchedQueue *)calloc(sizeof(SSchedQueue), 1);
|
||||
if (pSched == NULL) {
|
||||
pError("%s: no enough memory for pSched, reason: %s", label, strerror(errno));
|
||||
goto _error;
|
||||
pError("%s: no enough memory for pSched", label);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSched->queue = (SSchedMsg *)calloc(sizeof(SSchedMsg), queueSize);
|
||||
if (pSched->queue == NULL) {
|
||||
pError("%s: no enough memory for queue", label);
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSched->qthread = calloc(sizeof(pthread_t), numOfThreads);
|
||||
if (pSched->qthread == NULL) {
|
||||
pError("%s: no enough memory for qthread", label);
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memset(pSched, 0, sizeof(SSchedQueue));
|
||||
pSched->queueSize = queueSize;
|
||||
strncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow
|
||||
pSched->label[sizeof(pSched->label)-1] = '\0';
|
||||
|
||||
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
|
||||
pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
|
||||
pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->fullSem, 0, 0) != 0) {
|
||||
pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
|
||||
pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
|
||||
pSched->fullSlot = 0;
|
||||
pSched->emptySlot = 0;
|
||||
|
||||
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)numOfThreads);
|
||||
if (pSched->qthread == NULL) {
|
||||
pError("%s: no enough memory for qthread, reason: %s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
|
||||
pError("init %s:queueMutex failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||
if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
|
||||
pError("init %s:empty semaphore failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->fullSem, 0, 0) != 0) {
|
||||
pError("init %s:full semaphore failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
|
||||
pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
pthread_attr_t attr;
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||
int code = pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched);
|
||||
pthread_attr_destroy(&attr);
|
||||
if (code != 0) {
|
||||
pError("%s: failed to create rpc thread(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
++pSched->numOfThreads;
|
||||
}
|
||||
|
||||
pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
|
||||
pTrace("%s scheduler is initialized, numOfThreads:%d", label, pSched->numOfThreads);
|
||||
|
||||
return (void *)pSched;
|
||||
|
||||
_error:
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
|
||||
|
@ -124,21 +126,21 @@ void *taosProcessSchedQueue(void *param) {
|
|||
pTrace("wait %s fullSem was interrupted", pSched->label);
|
||||
continue;
|
||||
}
|
||||
pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
|
||||
pError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
msg = pSched->queue[pSched->fullSlot];
|
||||
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
|
||||
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
|
||||
|
||||
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
|
||||
pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno));
|
||||
pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (tsem_post(&pSched->emptySem) != 0)
|
||||
pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno));
|
||||
pError("post %s emptySem failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (msg.fp)
|
||||
(*(msg.fp))(&msg);
|
||||
|
@ -158,22 +160,23 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
|
|||
|
||||
while (tsem_wait(&pSched->emptySem) != 0) {
|
||||
if (errno != EINTR) {
|
||||
pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
|
||||
break;
|
||||
}
|
||||
pTrace("wait %s emptySem was interrupted", pSched->label);
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
pSched->queue[pSched->emptySlot] = *pMsg;
|
||||
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
|
||||
|
||||
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
|
||||
pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (tsem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
|
||||
if (tsem_post(&pSched->fullSem) != 0)
|
||||
pError("post %s fullSem failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -183,10 +186,12 @@ void taosCleanUpScheduler(void *param) {
|
|||
if (pSched == NULL) return;
|
||||
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
pthread_cancel(pSched->qthread[i]);
|
||||
if (pSched->qthread[i])
|
||||
pthread_cancel(pSched->qthread[i]);
|
||||
}
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
pthread_join(pSched->qthread[i], NULL);
|
||||
if (pSched->qthread[i])
|
||||
pthread_join(pSched->qthread[i], NULL);
|
||||
}
|
||||
|
||||
tsem_destroy(&pSched->emptySem);
|
||||
|
@ -197,8 +202,8 @@ void taosCleanUpScheduler(void *param) {
|
|||
taosTmrStopA(&pSched->pTimer);
|
||||
}
|
||||
|
||||
free(pSched->queue);
|
||||
free(pSched->qthread);
|
||||
if (pSched->queue) free(pSched->queue);
|
||||
if (pSched->qthread) free(pSched->qthread);
|
||||
free(pSched); // fix memory leak
|
||||
}
|
||||
|
||||
|
|
|
@ -224,10 +224,11 @@ void vnodeRelease(void *pVnodeRaw) {
|
|||
// remove the whole directory
|
||||
}
|
||||
|
||||
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
|
||||
free(pVnode);
|
||||
|
||||
int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1);
|
||||
dTrace("pVnode:%p vgId:%d, vnode is released, vnodes:%d", pVnode, vgId, count);
|
||||
|
||||
if (count <= 0) {
|
||||
taosCleanUpIntHash(tsDnodeVnodesHash);
|
||||
vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||
|
|
|
@ -222,8 +222,9 @@ int tsdbOpenFile(SFile *pFile, int oflag);
|
|||
int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
|
||||
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
|
||||
|
||||
#define TSDB_FGROUP_ITER_FORWARD 0
|
||||
#define TSDB_FGROUP_ITER_BACKWARD 1
|
||||
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
|
||||
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
|
||||
|
||||
typedef struct {
|
||||
int numOfFGroups;
|
||||
SFileGroup *base;
|
||||
|
|
|
@ -154,7 +154,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
|
|||
}
|
||||
|
||||
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
|
||||
void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags);
|
||||
void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), compFGroupKey, flags);
|
||||
if (ptr == NULL) {
|
||||
pIter->pFileGroup = NULL;
|
||||
} else {
|
||||
|
@ -173,7 +173,7 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
|
|||
pIter->pFileGroup += 1;
|
||||
}
|
||||
} else {
|
||||
if (pIter->pFileGroup - 1 == pIter->base) {
|
||||
if (pIter->pFileGroup == pIter->base) {
|
||||
pIter->pFileGroup = NULL;
|
||||
} else {
|
||||
pIter->pFileGroup -= 1;
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
#define EXTRA_BYTES 2
|
||||
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
|
||||
#define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC)
|
||||
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
|
||||
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
|
||||
|
||||
enum {
|
||||
QUERY_RANGE_LESS_EQUAL = 0,
|
||||
|
@ -87,14 +87,8 @@ typedef struct STableBlockInfo {
|
|||
int32_t groupIdx; /* number of group is less than the total number of tables */
|
||||
} STableBlockInfo;
|
||||
|
||||
enum {
|
||||
SINGLE_TABLE_MODEL = 1,
|
||||
MULTI_TABLE_MODEL = 2,
|
||||
};
|
||||
|
||||
typedef struct STsdbQueryHandle {
|
||||
STsdbRepo* pTsdb;
|
||||
int8_t model; // access model, single table model or multi-table model
|
||||
SQueryFilePos cur; // current position
|
||||
SQueryFilePos start; // the start position, used for secondary/third iteration
|
||||
|
||||
|
@ -128,21 +122,6 @@ typedef struct STsdbQueryHandle {
|
|||
SCompIdx* compIndex;
|
||||
} STsdbQueryHandle;
|
||||
|
||||
int32_t doAllocateBuf(STsdbQueryHandle* pQueryHandle, int32_t rowsPerFileBlock) {
|
||||
// record the maximum column width among columns of this meter/metric
|
||||
SColumnInfoData* pColumn = taosArrayGet(pQueryHandle->pColumns, 0);
|
||||
|
||||
int32_t maxColWidth = pColumn->info.bytes;
|
||||
for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) {
|
||||
int32_t bytes = pColumn[i].info.bytes;
|
||||
if (bytes > maxColWidth) {
|
||||
maxColWidth = bytes;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
|
||||
pBlockLoadInfo->slot = -1;
|
||||
pBlockLoadInfo->sid = -1;
|
||||
|
@ -161,9 +140,9 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
|
|||
// todo 2. add the reference count for each table that is involved in query
|
||||
|
||||
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
|
||||
pQueryHandle->order = pCond->order;
|
||||
pQueryHandle->order = pCond->order;
|
||||
pQueryHandle->window = pCond->twindow;
|
||||
pQueryHandle->pTsdb = tsdb;
|
||||
pQueryHandle->pTsdb = tsdb;
|
||||
pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)),
|
||||
|
||||
pQueryHandle->loadDataAfterSeek = false;
|
||||
|
@ -174,25 +153,33 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
|
|||
|
||||
pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo));
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STableId id = *(STableId*)taosArrayGet(idList, i);
|
||||
|
||||
STableId id = *(STableId*) taosArrayGet(idList, i);
|
||||
|
||||
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid);
|
||||
if (pTable == NULL) {
|
||||
dError("%p failed to get table, error uid:%" PRIu64, pQueryHandle, id.uid);
|
||||
continue;
|
||||
}
|
||||
|
||||
STableCheckInfo info = {
|
||||
.lastKey = pQueryHandle->window.skey,
|
||||
.tableId = id,
|
||||
.pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), // todo this may be failed
|
||||
.pCompInfo = NULL,
|
||||
.lastKey = pQueryHandle->window.skey,
|
||||
.tableId = id,
|
||||
.pTableObj = pTable,
|
||||
};
|
||||
|
||||
assert(info.pTableObj != NULL);
|
||||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
||||
}
|
||||
|
||||
pQueryHandle->model = (size > 1) ? MULTI_TABLE_MODEL : SINGLE_TABLE_MODEL;
|
||||
pQueryHandle->checkFiles = 1;
|
||||
|
||||
dTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
|
||||
|
||||
/*
|
||||
* For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
|
||||
* in case of descending timestamp order query.
|
||||
*/
|
||||
pQueryHandle->checkFiles = QUERY_IS_ASC_QUERY(pQueryHandle->order);
|
||||
pQueryHandle->activeIndex = 0;
|
||||
|
||||
// malloc buffer in order to load data from file
|
||||
// allocate buffer in order to load data blocks from file
|
||||
int32_t numOfCols = taosArrayGetSize(pColumnInfo);
|
||||
size_t bufferCapacity = 4096;
|
||||
|
||||
|
@ -206,10 +193,6 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
|
|||
taosArrayPush(pQueryHandle->pColumns, &pDest);
|
||||
}
|
||||
|
||||
if (doAllocateBuf(pQueryHandle, bufferCapacity) != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
|
||||
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
|
||||
|
||||
|
@ -239,7 +222,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
|||
|
||||
// todo dynamic get the daysperfile
|
||||
static int32_t getFileIdFromKey(TSKEY key) {
|
||||
return (int32_t)(key / 10); // set the starting fileId
|
||||
int64_t fid = (int64_t)(key / 10); // set the starting fileId
|
||||
if (fid > INT32_MAX) {
|
||||
fid = INT32_MAX;
|
||||
}
|
||||
|
||||
return fid;
|
||||
}
|
||||
|
||||
static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order);
|
||||
|
@ -247,8 +235,12 @@ static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks,
|
|||
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
|
||||
// todo check open file failed
|
||||
SFileGroup* fileGroup = pQueryHandle->pFileGroup;
|
||||
|
||||
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
|
||||
if (fileGroup->files[TSDB_FILE_TYPE_HEAD].fd == FD_INITIALIZER) {
|
||||
fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY);
|
||||
} else {
|
||||
assert(FD_VALID(fileGroup->files[TSDB_FILE_TYPE_HEAD].fd));
|
||||
}
|
||||
|
||||
// load all the comp offset value for all tables in this file
|
||||
|
@ -262,7 +254,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
|||
|
||||
SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid];
|
||||
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
|
||||
|
||||
assert(0);
|
||||
} else {
|
||||
if (pCheckInfo->compSize < compIndex->len) {
|
||||
assert(compIndex->len > 0);
|
||||
|
@ -271,46 +263,35 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
|||
assert(t != NULL);
|
||||
|
||||
pCheckInfo->pCompInfo = (SCompInfo*) t;
|
||||
pCheckInfo->compSize = compIndex->len;
|
||||
}
|
||||
|
||||
tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
|
||||
|
||||
int32_t index = 0;
|
||||
|
||||
SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
|
||||
|
||||
TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
|
||||
TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
|
||||
|
||||
// discard the unqualified data block based on the query time window
|
||||
int32_t start = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, compIndex->numOfSuperBlocks,
|
||||
pQueryHandle->order, pCheckInfo->lastKey);
|
||||
|
||||
if (type == QUERY_RANGE_GREATER_EQUAL) {
|
||||
if (pCheckInfo->lastKey <= pCheckInfo->pCompInfo->blocks[start].keyLast) {
|
||||
// break;
|
||||
} else {
|
||||
index = -1;
|
||||
}
|
||||
} else {
|
||||
if (pCheckInfo->lastKey >= pCheckInfo->pCompInfo->blocks[start].keyFirst) {
|
||||
// break;
|
||||
} else {
|
||||
index = -1;
|
||||
}
|
||||
}
|
||||
|
||||
// not found in data blocks in current file
|
||||
if (index == -1) {
|
||||
int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfSuperBlocks, s, TSDB_ORDER_ASC);
|
||||
int32_t end = start;
|
||||
|
||||
if (s > pCompInfo->blocks[start].keyLast) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// todo speedup the procedure of locating end block
|
||||
int32_t e = start;
|
||||
while (e < compIndex->numOfSuperBlocks &&
|
||||
(pCheckInfo->pCompInfo->blocks[e].keyFirst <= pQueryHandle->window.ekey)) {
|
||||
e += 1;
|
||||
// todo speedup the procedure of located end block
|
||||
while (end < compIndex->numOfSuperBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
|
||||
end += 1;
|
||||
}
|
||||
|
||||
pCheckInfo->numOfBlocks = (end - start);
|
||||
|
||||
if (start > 0) {
|
||||
memmove(pCheckInfo->pCompInfo->blocks, &pCheckInfo->pCompInfo->blocks[start], (e - start) * sizeof(SCompBlock));
|
||||
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock));
|
||||
}
|
||||
|
||||
pCheckInfo->numOfBlocks = (e - start);
|
||||
(*numOfBlocks) += pCheckInfo->numOfBlocks;
|
||||
}
|
||||
}
|
||||
|
@ -413,7 +394,6 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
|||
}
|
||||
|
||||
SDataCols* pDataCols = pCheckInfo->pDataCols;
|
||||
|
||||
if (pCheckInfo->lastKey > pBlock->keyFirst) {
|
||||
cur->pos =
|
||||
binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
|
||||
|
@ -425,8 +405,24 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
|||
} else { // the whole block is loaded in to buffer
|
||||
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
|
||||
}
|
||||
} else { // todo desc query
|
||||
} else {
|
||||
// query ended in current block
|
||||
if (pQueryHandle->window.ekey > pBlock->keyFirst) {
|
||||
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SDataCols* pDataCols = pCheckInfo->pDataCols;
|
||||
if (pCheckInfo->lastKey < pBlock->keyLast) {
|
||||
cur->pos =
|
||||
binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
|
||||
} else {
|
||||
cur->pos = pBlock->numOfPoints - 1;
|
||||
}
|
||||
|
||||
filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
|
||||
} else {
|
||||
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -619,7 +615,9 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
|||
}
|
||||
|
||||
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
|
||||
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
|
||||
size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
|
||||
assert(numOfCols <= TSDB_MAX_COLUMNS);
|
||||
|
||||
SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
|
||||
|
@ -923,8 +921,8 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
|||
pQueryHandle->locateStart = true;
|
||||
|
||||
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey);
|
||||
|
||||
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, TSDB_FGROUP_ITER_FORWARD);
|
||||
|
||||
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
||||
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
||||
|
||||
int32_t numOfBlocks = -1;
|
||||
|
@ -934,13 +932,14 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
|||
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
|
||||
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
||||
if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) {
|
||||
int32_t type = QUERY_IS_ASC_QUERY(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
|
||||
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
assert(numOfBlocks >= 0);
|
||||
dTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
|
||||
pQueryHandle->pFileGroup->fileId, numOfTables);
|
||||
numOfTables, pQueryHandle->pFileGroup->fileId);
|
||||
|
||||
// todo return error code to query engine
|
||||
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -961,7 +960,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
|||
return false;
|
||||
}
|
||||
|
||||
cur->slot = 0;
|
||||
cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
|
||||
cur->fid = pQueryHandle->pFileGroup->fileId;
|
||||
|
||||
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
||||
|
@ -970,8 +969,10 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
|||
|
||||
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||
} else {
|
||||
if (cur->slot == pQueryHandle->numOfBlocks - 1) { // all blocks
|
||||
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && QUERY_IS_ASC_QUERY(pQueryHandle->order)) ||
|
||||
(cur->slot == 0 && !QUERY_IS_ASC_QUERY(pQueryHandle->order))) { // all blocks
|
||||
int32_t numOfBlocks = -1;
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
pQueryHandle->numOfBlocks = 0;
|
||||
|
||||
|
@ -1001,67 +1002,84 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
|||
cur->fid = -1;
|
||||
return false;
|
||||
}
|
||||
|
||||
cur->slot = 0;
|
||||
|
||||
cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
|
||||
cur->fid = pQueryHandle->pFileGroup->fileId;
|
||||
|
||||
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
||||
|
||||
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
|
||||
SCompBlock* pBlock = pBlockInfo->pBlock.compBlock;
|
||||
|
||||
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||
} else { // next block of the same file
|
||||
cur->slot += 1;
|
||||
cur->pos = 0;
|
||||
|
||||
int32_t step = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 1:-1;
|
||||
cur->slot += step;
|
||||
|
||||
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
||||
cur->pos = 0;
|
||||
} else {
|
||||
cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1;
|
||||
}
|
||||
|
||||
return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handle data in cache situation
|
||||
bool tsdbNextDataBlock(tsdb_query_handle_t* pQueryHandle) {
|
||||
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
|
||||
|
||||
size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo);
|
||||
|
||||
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
// todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity
|
||||
assert(numOfTables > 0);
|
||||
|
||||
if (pHandle->checkFiles) {
|
||||
if (getDataBlocksInFiles(pHandle)) {
|
||||
while (pQueryHandle->activeIndex < numOfTables) {
|
||||
if (hasMoreDataInCache(pQueryHandle)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
pHandle->activeIndex = 0;
|
||||
pHandle->checkFiles = 0;
|
||||
|
||||
while (pHandle->activeIndex < numOfTables) {
|
||||
if (hasMoreDataInCache(pHandle)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
pHandle->activeIndex += 1;
|
||||
}
|
||||
|
||||
return false;
|
||||
} else {
|
||||
while (pHandle->activeIndex < numOfTables) {
|
||||
if (hasMoreDataInCache(pHandle)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
pHandle->activeIndex += 1;
|
||||
}
|
||||
|
||||
return false;
|
||||
pQueryHandle->activeIndex += 1;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// handle data in cache situation
|
||||
bool tsdbNextDataBlock(tsdb_query_handle_t* pqHandle) {
|
||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
|
||||
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
assert(numOfTables > 0);
|
||||
|
||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
||||
if (pQueryHandle->checkFiles) {
|
||||
if (getDataBlocksInFiles(pQueryHandle)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
pQueryHandle->activeIndex = 0;
|
||||
pQueryHandle->checkFiles = false;
|
||||
}
|
||||
|
||||
return doHasDataInBuffer(pQueryHandle);
|
||||
} else { // starts from the buffer in case of descending timestamp order check data blocks
|
||||
if (!pQueryHandle->checkFiles) {
|
||||
if (doHasDataInBuffer(pQueryHandle)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
pQueryHandle->checkFiles = true;
|
||||
}
|
||||
|
||||
return getDataBlocksInFiles(pQueryHandle);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
|
||||
STsdbQueryHandle* pHandle) {
|
||||
STsdbQueryHandle* pQueryHandle) {
|
||||
int numOfRows = 0;
|
||||
int32_t numOfCols = taosArrayGetSize(pHandle->pColumns);
|
||||
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||
*skey = INT64_MIN;
|
||||
|
||||
while (tSkipListIterNext(pIter)) {
|
||||
|
@ -1079,7 +1097,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
|
|||
|
||||
int32_t offset = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||
memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
|
||||
offset += pColInfo->info.bytes;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import taos
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
tdSql.execute('show databases')
|
||||
tdSql.execute('drop database if exists db')
|
||||
tdSql.execute('create database db')
|
||||
tdSql.execute('use db')
|
||||
tdSql.execute('create table tb (ts timestamp, speed int)')
|
||||
|
||||
insertRows = 10
|
||||
tdLog.info("insert %d rows" % (insertRows))
|
||||
for i in range(0, insertRows):
|
||||
tdSql.execute('insert into tb values (now + %dm, %d)' % (i, i))
|
||||
|
||||
# tdLog.info("insert earlier data")
|
||||
# tdSql.execute('insert into tb values (now - 5m , 10)')
|
||||
# tdSql.execute('insert into tb values (now - 6m , 10)')
|
||||
# tdSql.execute('insert into tb values (now - 7m , 10)')
|
||||
# tdSql.execute('insert into tb values (now - 8m , 10)')
|
||||
|
||||
# tdSql.query("select * from tb")
|
||||
# tdSql.checkRows(insertRows)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -0,0 +1 @@
|
|||
sudo python2 ./test.py -f insert/basic.py
|
|
@ -0,0 +1,87 @@
|
|||
#!/usr/bin/python
|
||||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
# install pip
|
||||
# pip install src/connector/python/linux/python2/
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
import sys
|
||||
import getopt
|
||||
from util.log import *
|
||||
from util.dnodes import *
|
||||
from util.cases import *
|
||||
|
||||
import taos
|
||||
|
||||
# add testcase here:
|
||||
from insert.basic import *
|
||||
|
||||
if __name__ == "__main__":
|
||||
fileName = "all"
|
||||
deployPath = ""
|
||||
masterIp = ""
|
||||
testCluster = False
|
||||
opts, args = getopt.getopt(sys.argv[1:], 'f:p:m:sch', [
|
||||
'file=', 'path=', 'master', 'stop', 'cluster', 'help'])
|
||||
for key, value in opts:
|
||||
if key in ['-h', '--help']:
|
||||
tdLog.printNoPrefix(
|
||||
'A collection of test cases written using Python')
|
||||
tdLog.printNoPrefix('-f Name of test case file written by Python')
|
||||
tdLog.printNoPrefix('-p Deploy Path for Simulator')
|
||||
tdLog.printNoPrefix('-m Master Ip for Simulator')
|
||||
tdLog.printNoPrefix('-c Test Cluster Flag')
|
||||
tdLog.printNoPrefix('-s stop All dnodes')
|
||||
sys.exit(0)
|
||||
if key in ['-f', '--file']:
|
||||
fileName = value
|
||||
if key in ['-p', '--path']:
|
||||
deployPath = value
|
||||
if key in ['-m', '--master']:
|
||||
masterIp = value
|
||||
if key in ['-c', '--cluster']:
|
||||
testCluster = True
|
||||
if key in ['-s', '--stop']:
|
||||
cmd = "ps -ef|grep -w taosd | grep 'taosd' | grep -v grep | awk '{print $2}' && pkill -9 taosd"
|
||||
os.system(cmd)
|
||||
tdLog.exit('stop All dnodes')
|
||||
|
||||
if masterIp == "":
|
||||
tdDnodes.init(deployPath)
|
||||
if testCluster:
|
||||
tdLog.notice("Procedures for testing cluster")
|
||||
if fileName == "all":
|
||||
tdCases.runAllCluster()
|
||||
else:
|
||||
tdCases.runOneCluster(fileName)
|
||||
else:
|
||||
tdLog.notice("Procedures for testing self-deployment")
|
||||
tdDnodes.stopAll()
|
||||
tdDnodes.deploy(1)
|
||||
tdDnodes.start(1)
|
||||
conn = taos.connect(
|
||||
host='192.168.0.1',
|
||||
config=tdDnodes.getSimCfgPath())
|
||||
if fileName == "all":
|
||||
tdCases.runAllLinux(conn)
|
||||
else:
|
||||
tdLog.info("CBD LN78: %s" % (fileName))
|
||||
tdCases.runOneLinux(conn, fileName)
|
||||
conn.close()
|
||||
else:
|
||||
tdLog.notice("Procedures for tdengine deployed in %s" % (masterIp))
|
||||
conn = taos.connect(host=masterIp, config=tdDnodes.getSimCfgPath())
|
||||
if fileName == "all":
|
||||
tdCases.runAllWindows(conn)
|
||||
else:
|
||||
tdCases.runOneWindows(conn, fileName)
|
||||
conn.close()
|
||||
|
|
@ -0,0 +1,103 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import datetime
|
||||
from util.log import *
|
||||
|
||||
|
||||
class TDCase:
|
||||
def __init__(self, name, case):
|
||||
self.name = name
|
||||
self.case = case
|
||||
|
||||
|
||||
class TDCases:
|
||||
def __init__(self):
|
||||
self.linuxCases = []
|
||||
self.windowsCases = []
|
||||
self.clusterCases = []
|
||||
|
||||
def addWindows(self, name, case):
|
||||
self.windowsCases.append(TDCase(name, case))
|
||||
|
||||
def addLinux(self, name, case):
|
||||
self.linuxCases.append(TDCase(name, case))
|
||||
|
||||
def addCluster(self, name, case):
|
||||
self.clusterCases.append(TDCase(name, case))
|
||||
|
||||
def runAllLinux(self, conn):
|
||||
tdLog.notice("run total %d cases" % (len(self.linuxCases)))
|
||||
for case in self.linuxCases:
|
||||
case.case.init(conn)
|
||||
case.case.run()
|
||||
case.case.stop()
|
||||
tdLog.notice("total %d cases executed" % (len(self.linuxCases)))
|
||||
|
||||
def runOneLinux(self, conn, fileName):
|
||||
tdLog.notice("run cases like %s" % (fileName))
|
||||
runNum = 0
|
||||
for case in self.linuxCases:
|
||||
if case.name.find(fileName) != -1:
|
||||
case.case.init(conn)
|
||||
case.case.run()
|
||||
case.case.stop()
|
||||
time.sleep(5)
|
||||
runNum += 1
|
||||
tdLog.notice("total %d cases executed" % (runNum))
|
||||
|
||||
def runAllWindows(self, conn):
|
||||
tdLog.notice("run total %d cases" % (len(self.windowsCases)))
|
||||
for case in self.windowsCases:
|
||||
case.case.init(conn)
|
||||
case.case.run()
|
||||
case.case.stop()
|
||||
tdLog.notice("total %d cases executed" % (len(self.windowsCases)))
|
||||
|
||||
def runOneWindows(self, conn, fileName):
|
||||
tdLog.notice("run cases like %s" % (fileName))
|
||||
runNum = 0
|
||||
for case in self.windowsCases:
|
||||
if case.name.find(fileName) != -1:
|
||||
case.case.init(conn)
|
||||
case.case.run()
|
||||
case.case.stop()
|
||||
time.sleep(2)
|
||||
runNum += 1
|
||||
tdLog.notice("total %d cases executed" % (runNum))
|
||||
|
||||
def runAllCluster(self):
|
||||
tdLog.notice("run total %d cases" % (len(self.clusterCases)))
|
||||
for case in self.clusterCases:
|
||||
case.case.init()
|
||||
case.case.run()
|
||||
case.case.stop()
|
||||
tdLog.notice("total %d cases executed" % (len(self.clusterCases)))
|
||||
|
||||
def runOneCluster(self, fileName):
|
||||
tdLog.notice("run cases like %s" % (fileName))
|
||||
runNum = 0
|
||||
for case in self.clusterCases:
|
||||
if case.name.find(fileName) != -1:
|
||||
case.case.init()
|
||||
case.case.run()
|
||||
case.case.stop()
|
||||
time.sleep(2)
|
||||
runNum += 1
|
||||
tdLog.notice("total %d cases executed" % (runNum))
|
||||
|
||||
|
||||
tdCases = TDCases()
|
|
@ -0,0 +1,332 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import os
|
||||
import os.path
|
||||
from util.log import *
|
||||
|
||||
|
||||
class TDSimClient:
|
||||
def init(self, path):
|
||||
self.path = path
|
||||
|
||||
def getCfgDir(self):
|
||||
return self.cfgDir
|
||||
|
||||
def cfg(self, option, value):
|
||||
cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath)
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
def deploy(self):
|
||||
self.logDir = "%s/sim/psim/log" % (self.path,)
|
||||
self.cfgDir = "%s/sim/psim/cfg" % (self.path)
|
||||
self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path)
|
||||
|
||||
cmd = "rm -rf " + self.logDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "rm -rf " + self.cfgDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "mkdir -p " + self.logDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "mkdir -p " + self.cfgDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "touch " + self.cfgPath
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
self.cfg("masterIp", "192.168.0.1")
|
||||
self.cfg("secondIp", "192.168.0.2")
|
||||
self.cfg("logDir", self.logDir)
|
||||
self.cfg("numOfLogLines", "100000000")
|
||||
self.cfg("numOfThreadsPerCore", "2.0")
|
||||
self.cfg("locale", "en_US.UTF-8")
|
||||
self.cfg("charset", "GBK")
|
||||
self.cfg("asyncLog", "0")
|
||||
self.cfg("anyIp", "0")
|
||||
self.cfg("sdbDebugFlag", "135")
|
||||
self.cfg("rpcDebugFlag", "135")
|
||||
self.cfg("tmrDebugFlag", "131")
|
||||
self.cfg("cDebugFlag", "135")
|
||||
self.cfg("udebugFlag", "135")
|
||||
self.cfg("jnidebugFlag", "135")
|
||||
self.cfg("qdebugFlag", "135")
|
||||
tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath))
|
||||
|
||||
|
||||
class TDDnode:
|
||||
def __init__(self, index):
|
||||
self.index = index
|
||||
self.running = 0
|
||||
self.deployed = 0
|
||||
|
||||
def init(self, path):
|
||||
self.path = path
|
||||
|
||||
def deploy(self):
|
||||
self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index)
|
||||
self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index)
|
||||
self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index)
|
||||
self.cfgPath = "%s/sim/dnode%d/cfg/taos.cfg" % (self.path, self.index)
|
||||
|
||||
cmd = "rm -rf " + self.dataDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "rm -rf " + self.logDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "rm -rf " + self.cfgDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "mkdir -p " + self.dataDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "mkdir -p " + self.logDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "mkdir -p " + self.cfgDir
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
cmd = "touch " + self.cfgPath
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
self.startIP()
|
||||
self.cfg("masterIp", "192.168.0.1")
|
||||
self.cfg("secondIp", "192.168.0.2")
|
||||
self.cfg("publicIp", "192.168.0.%d" % (self.index))
|
||||
self.cfg("internalIp", "192.168.0.%d" % (self.index))
|
||||
self.cfg("privateIp", "192.168.0.%d" % (self.index))
|
||||
self.cfg("dataDir", self.dataDir)
|
||||
self.cfg("logDir", self.logDir)
|
||||
self.cfg("numOfLogLines", "100000000")
|
||||
self.cfg("mgmtEqualVnodeNum", "0")
|
||||
self.cfg("clog", "1")
|
||||
self.cfg("statusInterval", "1")
|
||||
self.cfg("numOfTotalVnodes", "64")
|
||||
self.cfg("numOfMPeers", "3")
|
||||
self.cfg("numOfThreadsPerCore", "2.0")
|
||||
self.cfg("monitor", "0")
|
||||
self.cfg("maxVnodeConnections", "30000")
|
||||
self.cfg("maxMgmtConnections", "30000")
|
||||
self.cfg("maxMeterConnections", "30000")
|
||||
self.cfg("maxShellConns", "30000")
|
||||
self.cfg("locale", "en_US.UTF-8")
|
||||
self.cfg("charset", "UTF-8")
|
||||
self.cfg("asyncLog", "0")
|
||||
self.cfg("anyIp", "0")
|
||||
self.cfg("dDebugFlag", "135")
|
||||
self.cfg("mDebugFlag", "135")
|
||||
self.cfg("sdbDebugFlag", "135")
|
||||
self.cfg("rpcDebugFlag", "135")
|
||||
self.cfg("tmrDebugFlag", "131")
|
||||
self.cfg("cDebugFlag", "135")
|
||||
self.cfg("httpDebugFlag", "135")
|
||||
self.cfg("monitorDebugFlag", "135")
|
||||
self.cfg("udebugFlag", "135")
|
||||
self.cfg("jnidebugFlag", "135")
|
||||
self.cfg("qdebugFlag", "135")
|
||||
self.deployed = 1
|
||||
tdLog.debug(
|
||||
"dnode:%d is deployed and configured by %s" %
|
||||
(self.index, self.cfgPath))
|
||||
|
||||
def start(self):
|
||||
binPath = os.path.dirname(os.path.realpath(__file__))
|
||||
binPath = binPath + "/../../../debug/"
|
||||
binPath = os.path.realpath(binPath)
|
||||
binPath += "/build/bin/"
|
||||
|
||||
if self.deployed == 0:
|
||||
tdLog.exit("dnode:%d is not deployed" % (self.index))
|
||||
cmd = "nohup %staosd -c %s > /dev/null 2>&1 & " % (
|
||||
binPath, self.cfgDir)
|
||||
print(cmd)
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
self.running = 1
|
||||
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
|
||||
|
||||
tdLog.debug("wait 2 seconds for the dnode:%d to start." % (self.index))
|
||||
time.sleep(2)
|
||||
|
||||
def stop(self):
|
||||
if self.running != 0:
|
||||
cmd = "ps -ef|grep -w taosd | grep '%s' | grep -v grep | awk '{print $2}' && pkill -sigint taosd" % (
|
||||
self.cfgDir)
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
tdLog.debug("dnode:%d is stopped by kill -SIGINT" % (self.index))
|
||||
tdLog.debug(
|
||||
"wait 2 seconds for the dnode:%d to stop." %
|
||||
(self.index))
|
||||
time.sleep(2)
|
||||
|
||||
def forcestop(self):
|
||||
if self.running != 0:
|
||||
cmd = "ps -ef|grep -w taosd | grep '%s' | grep -v grep | awk '{print $2}' && pkill -sigkill taosd" % (
|
||||
self.cfgDir)
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
tdLog.debug("dnode:%d is stopped by kill -9" % (self.index))
|
||||
tdLog.debug(
|
||||
"wait 2 seconds for the dnode:%d to stop." %
|
||||
(self.index))
|
||||
time.sleep(2)
|
||||
|
||||
def startIP(self):
|
||||
cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index)
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
def stopIP(self):
|
||||
cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % (
|
||||
self.index, self.index)
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
def cfg(self, option, value):
|
||||
cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath)
|
||||
if os.system(cmd) != 0:
|
||||
tdLog.exit(cmd)
|
||||
|
||||
def getDnodeRootDir(self, index):
|
||||
dnodeRootDir = "%s/sim/psim/dnode%d" % (self.path, index)
|
||||
return dnodeRootDir
|
||||
|
||||
def getDnodesRootDir(self):
|
||||
dnodesRootDir = "%s/sim/psim" % (self.path)
|
||||
return dnodesRootDir
|
||||
|
||||
|
||||
class TDDnodes:
|
||||
def __init__(self):
|
||||
self.dnodes = []
|
||||
self.dnodes.append(TDDnode(1))
|
||||
self.dnodes.append(TDDnode(2))
|
||||
self.dnodes.append(TDDnode(3))
|
||||
self.dnodes.append(TDDnode(4))
|
||||
self.dnodes.append(TDDnode(5))
|
||||
self.dnodes.append(TDDnode(6))
|
||||
self.dnodes.append(TDDnode(7))
|
||||
self.dnodes.append(TDDnode(8))
|
||||
self.dnodes.append(TDDnode(9))
|
||||
self.dnodes.append(TDDnode(10))
|
||||
|
||||
def init(self, path):
|
||||
cmd = "ps -ef|grep -w taosd | grep 'taosd' | grep -v grep | awk '{print $2}' && pkill -sigkill taosd"
|
||||
os.system(cmd)
|
||||
|
||||
binPath = os.path.dirname(os.path.realpath(__file__))
|
||||
binPath = binPath + "/../../../debug/"
|
||||
tdLog.debug("binPath %s" % (binPath))
|
||||
binPath = os.path.realpath(binPath)
|
||||
tdLog.debug("binPath real path %s" % (binPath))
|
||||
|
||||
# cmd = "sudo cp %s/build/lib/libtaos.so /usr/local/lib/taos/" % (binPath)
|
||||
# tdLog.debug(cmd)
|
||||
# os.system(cmd)
|
||||
|
||||
# cmd = "sudo cp %s/build/bin/taos /usr/local/bin/taos/" % (binPath)
|
||||
# if os.system(cmd) != 0 :
|
||||
# tdLog.exit(cmd)
|
||||
# tdLog.debug("execute %s" % (cmd))
|
||||
|
||||
# cmd = "sudo cp %s/build/bin/taosd /usr/local/bin/taos/" % (binPath)
|
||||
# if os.system(cmd) != 0 :
|
||||
# tdLog.exit(cmd)
|
||||
# tdLog.debug("execute %s" % (cmd))
|
||||
|
||||
if path == "":
|
||||
# self.path = os.path.expanduser('~')
|
||||
self.path = os.path.abspath(binPath + "../../")
|
||||
else:
|
||||
self.path = os.path.realpath(path)
|
||||
|
||||
for i in range(len(self.dnodes)):
|
||||
self.dnodes[i].init(self.path)
|
||||
|
||||
self.sim = TDSimClient()
|
||||
self.sim.init(self.path)
|
||||
self.sim.deploy()
|
||||
|
||||
def deploy(self, index):
|
||||
self.check(index)
|
||||
self.dnodes[index - 1].deploy()
|
||||
|
||||
def cfg(self, index, option, value):
|
||||
self.check(index)
|
||||
self.dnodes[index - 1].cfg(option, value)
|
||||
|
||||
def start(self, index):
|
||||
self.check(index)
|
||||
self.dnodes[index - 1].start()
|
||||
|
||||
def stop(self, index):
|
||||
self.check(index)
|
||||
self.dnodes[index - 1].stop()
|
||||
|
||||
def forcestop(self, index):
|
||||
self.check(index)
|
||||
self.dnodes[index - 1].forcestop()
|
||||
|
||||
def startIP(self, index):
|
||||
self.check(index)
|
||||
self.dnodes[index - 1].startIP()
|
||||
|
||||
def stopIP(self, index):
|
||||
self.check(index)
|
||||
self.dnodes[index - 1].stopIP()
|
||||
|
||||
def check(self, index):
|
||||
if index < 1 or index > 10:
|
||||
tdLog.exit("index:%d should on a scale of [1, 10]" % (index))
|
||||
|
||||
def stopAll(self):
|
||||
tdLog.debug("stop all dnodes")
|
||||
for i in range(len(self.dnodes)):
|
||||
self.dnodes[i].stop()
|
||||
|
||||
cmd = "sudo systemctl stop taosd"
|
||||
os.system(cmd)
|
||||
# if os.system(cmd) != 0 :
|
||||
# tdLog.exit(cmd)
|
||||
cmd = "ps -ef | grep -w taosd | grep 'dnode' | grep -v grep | awk '{print $2}' && sudo pkill -sigkill taosd"
|
||||
os.system(cmd)
|
||||
# if os.system(cmd) != 0 :
|
||||
# tdLog.exit(cmd)
|
||||
|
||||
def getDnodesRootDir(self):
|
||||
dnodesRootDir = "%s/sim" % (self.path)
|
||||
return dnodesRootDir
|
||||
|
||||
def getSimCfgPath(self):
|
||||
return self.sim.getCfgDir()
|
||||
|
||||
|
||||
tdDnodes = TDDnodes()
|
|
@ -0,0 +1,48 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import datetime
|
||||
|
||||
|
||||
class TDLog:
|
||||
def __init__(self):
|
||||
self.path = ""
|
||||
|
||||
def info(self, info):
|
||||
print "%s %s" % (datetime.datetime.now(), info)
|
||||
|
||||
def sleep(self, sec):
|
||||
print "%s sleep %d seconds" % (datetime.datetime.now(), sec)
|
||||
time.sleep(sec)
|
||||
|
||||
def debug(self, err):
|
||||
print "\033[1;36m%s %s\033[0m" % (datetime.datetime.now(), err)
|
||||
|
||||
def success(self, info):
|
||||
print "\033[1;32m%s %s\033[0m" % (datetime.datetime.now(), info)
|
||||
|
||||
def notice(self, err):
|
||||
print "\033[1;33m%s %s\033[0m" % (datetime.datetime.now(), err)
|
||||
|
||||
def exit(self, err):
|
||||
print "\033[1;31m%s %s\033[0m" % (datetime.datetime.now(), err)
|
||||
sys.exit(1)
|
||||
|
||||
def printNoPrefix(self, info):
|
||||
print "\033[1;36m%s\033[0m" % (info)
|
||||
|
||||
|
||||
tdLog = TDLog()
|
|
@ -0,0 +1,135 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import datetime
|
||||
from util.log import *
|
||||
|
||||
|
||||
class TDSql:
|
||||
def __init__(self):
|
||||
self.queryRows = 0
|
||||
self.queryCols = 0
|
||||
self.affectedRows = 0
|
||||
|
||||
def init(self, cursor):
|
||||
self.cursor = cursor
|
||||
|
||||
def close(self):
|
||||
self.cursor.close()
|
||||
|
||||
def prepare(self):
|
||||
tdLog.info("prepare database:db")
|
||||
self.cursor.execute('reset query cache')
|
||||
self.cursor.execute('drop database if exists db')
|
||||
self.cursor.execute('create database db')
|
||||
self.cursor.execute('use db')
|
||||
|
||||
def error(self, sql):
|
||||
expectErrNotOccured = True
|
||||
try:
|
||||
self.cursor.execute(sql)
|
||||
except BaseException:
|
||||
expectErrNotOccured = False
|
||||
if expectErrNotOccured:
|
||||
tdLog.exit("sql:%.40s, expect error not occured" % (sql))
|
||||
else:
|
||||
tdLog.info("sql:%.40s, expect error occured" % (sql))
|
||||
|
||||
def query(self, sql):
|
||||
self.sql = sql
|
||||
self.cursor.execute(sql)
|
||||
self.queryResult = self.cursor.fetchall()
|
||||
self.queryRows = len(self.queryResult)
|
||||
self.queryCols = len(self.cursor.description)
|
||||
# if self.queryRows == 1 and self.queryCols == 1:
|
||||
# tdLog.info("sql:%s, rows:%d cols:%d data:%s" % (self.sql, self.queryRows, self.queryCols, self.queryResult[0][0]))
|
||||
# else:
|
||||
# tdLog.info("sql:%s, rows:%d cols:%d" % (self.sql, self.queryRows, self.queryCols))
|
||||
return self.queryRows
|
||||
|
||||
def checkRows(self, expectRows):
|
||||
if self.queryRows != expectRows:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, queryRows:%d != expect:%d" %
|
||||
(self.sql, self.queryRows, expectRows))
|
||||
tdLog.info("sql:%.40s, queryRows:%d == expect:%d" %
|
||||
(self.sql, self.queryRows, expectRows))
|
||||
|
||||
def checkData(self, row, col, data):
|
||||
if row < 0:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, row:%d is smaller than zero" %
|
||||
(self.sql, row))
|
||||
if col < 0:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, col:%d is smaller than zero" %
|
||||
(self.sql, col))
|
||||
if row >= self.queryRows:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, row:%d is larger than queryRows:%d" %
|
||||
(self.sql, row, self.queryRows))
|
||||
if col >= self.queryCols:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, col:%d is larger than queryRows:%d" %
|
||||
(self.sql, col, self.queryCols))
|
||||
if self.queryResult[row][col] != data:
|
||||
tdLog.exit(
|
||||
"sql:%.40s row:%d col:%d data:%s != expect:%s" %
|
||||
(self.sql, row, col, self.queryResult[row][col], data))
|
||||
tdLog.info("sql:%.40s, row:%d col:%d data:%s == expect:%d" %
|
||||
(self.sql, row, col, self.queryResult[row][col], data))
|
||||
|
||||
def getData(self, row, col):
|
||||
if row < 0:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, row:%d is smaller than zero" %
|
||||
(self.sql, row))
|
||||
if col < 0:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, col:%d is smaller than zero" %
|
||||
(self.sql, col))
|
||||
if row >= self.queryRows:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, row:%d is larger than queryRows:%d" %
|
||||
(self.sql, row, self.queryRows))
|
||||
if col >= self.queryCols:
|
||||
tdLog.exit(
|
||||
"sql:%.40s, col:%d is larger than queryRows:%d" %
|
||||
(self.sql, col, self.queryCols))
|
||||
return self.queryResult[row][col]
|
||||
|
||||
def executeTimes(self, sql, times):
|
||||
for i in range(times):
|
||||
try:
|
||||
return self.cursor.execute(sql)
|
||||
except BaseException:
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
def execute(self, sql):
|
||||
self.sql = sql
|
||||
self.affectedRows = self.cursor.execute(sql)
|
||||
return self.affectedRows
|
||||
|
||||
def checkAffectedRows(self, expectAffectedRows):
|
||||
if self.affectedRows != expectAffectedRows:
|
||||
tdLog.exit("sql:%.40s, affectedRows:%d != expect:%d" %
|
||||
(self.sql, self.affectedRows, expectAffectedRows))
|
||||
tdLog.info("sql:%.40s, affectedRows:%d == expect:%d" %
|
||||
(self.sql, self.affectedRows, expectAffectedRows))
|
||||
|
||||
|
||||
tdSql = TDSql()
|
Loading…
Reference in New Issue