From 11353abc180d26b6838cf7c4d1a4fac44ce51b1f Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Fri, 1 May 2020 07:05:11 +0000 Subject: [PATCH 01/11] draft --- src/cq/src/cqMain.c | 167 +++++++++++++++++++++++++++++++ src/cq/src/vnodeStream.c | 209 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 376 insertions(+) create mode 100644 src/cq/src/cqMain.c create mode 100644 src/cq/src/vnodeStream.c diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c new file mode 100644 index 0000000000..e5bbf8353a --- /dev/null +++ b/src/cq/src/cqMain.c @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "taosmsg.h" +#include "vnode.h" + +/* static TAOS *dbConn = NULL; */ +void vnodeCloseStreamCallback(void *param); + +void cqOpen(void *param, void *tmrId) { + SVnodeObj *pVnode = (SVnodeObj *)param; + SMeterObj *pObj; + + if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; + if (pVnode->meterList == NULL) return; + + taosTmrStopA(&pVnode->streamTimer); + pVnode->streamTimer = NULL; + + for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { + pObj = pVnode->meterList[sid]; + if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue; + + dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql); + + if (pVnode->dbConn == NULL) { + char db[64] = {0}; + char user[64] = {0}; + vnodeGetDBFromMeterId(pObj, db); + sprintf(user, "_%s", pVnode->cfg.acct); + pVnode->dbConn = taos_connect(NULL, user, tsInternalPass, db, 0); + } + + if (pVnode->dbConn == NULL) { + dError("vid:%d, failed to connect to mgmt node", pVnode->vnode); + taosTmrReset(vnodeOpenStreams, 1000, param, vnodeTmrCtrl, &pVnode->streamTimer); + return; + } + + if (pObj->pStream == NULL) { + pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, + vnodeCloseStreamCallback); + if (pObj->pStream) pVnode->numOfStreams++; + } + } +} + +// Close all streams in a vnode +void cqClose(SVnodeObj *pVnode) { + SMeterObj *pObj; + dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); + + // stop stream computing + for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { + pObj = pVnode->meterList[sid]; + if (pObj == NULL) continue; + if (pObj->sqlLen > 0 && pObj->pStream) { + taos_close_stream(pObj->pStream); + pVnode->numOfStreams--; + } + pObj->pStream = NULL; + } +} + +void cqCreate(SMeterObj *pObj) { + if (pObj->sqlLen <= 0) return; + + SVnodeObj *pVnode = vnodeList + pObj->vnode; + + if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; + if (pObj->pStream) return; + + dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql); + if (pVnode->dbConn == NULL) { + if (pVnode->streamTimer == NULL) taosTmrReset(vnodeOpenStreams, 1000, pVnode, vnodeTmrCtrl, &pVnode->streamTimer); + } else { + pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, + vnodeCloseStreamCallback); + if (pObj->pStream) pVnode->numOfStreams++; + } +} + +// Close only one stream +void cqDrop(SMeterObj *pObj) { + SVnodeObj *pVnode = vnodeList + pObj->vnode; + if (pObj->sqlLen <= 0) return; + + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pVnode->numOfStreams--; + } + + pObj->pStream = NULL; + if (pVnode->numOfStreams == 0) { + taos_close(pVnode->dbConn); + pVnode->dbConn = NULL; + } + + dTrace("vid:%d sid:%d id:%d stream is removed", pObj->vnode, pObj->sid, pObj->meterId); +} + +void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { + SMeterObj *pObj = (SMeterObj *)param; + dTrace("vid:%d sid:%d id:%s, stream result is ready", pObj->vnode, pObj->sid, pObj->meterId); + + // construct data + int32_t contLen = pObj->bytesPerPoint; + char * pTemp = calloc(1, sizeof(SSubmitMsg) + pObj->bytesPerPoint + sizeof(SVMsgHeader)); + SSubmitMsg *pMsg = (SSubmitMsg *)(pTemp + sizeof(SVMsgHeader)); + + pMsg->numOfRows = htons(1); + + char ncharBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; + + int32_t offset = 0; + for (int32_t i = 0; i < pObj->numOfColumns; ++i) { + char *dst = row[i]; + if (dst == NULL) { + setNull(pMsg->payLoad + offset, pObj->schema[i].type, pObj->schema[i].bytes); + } else { + // here, we need to transfer nchar(utf8) to unicode(ucs-4) + if (pObj->schema[i].type == TSDB_DATA_TYPE_NCHAR) { + taosMbsToUcs4(row[i], pObj->schema[i].bytes, ncharBuf, TSDB_MAX_BYTES_PER_ROW); + dst = ncharBuf; + } + + memcpy(pMsg->payLoad + offset, dst, pObj->schema[i].bytes); + } + + offset += pObj->schema[i].bytes; + } + + contLen += sizeof(SSubmitMsg); + + int32_t numOfPoints = 0; + int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, + &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); + + if (code != TSDB_CODE_SUCCESS) { + dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId); + } + + assert(numOfPoints >= 0 && numOfPoints <= 1); + tfree(pTemp); +} + +static void vnodeGetDBFromMeterId(SMeterObj *pObj, char *db) { + char *st = strstr(pObj->meterId, "."); + char *end = strstr(st + 1, "."); + + memcpy(db, st + 1, end - (st + 1)); +} + + diff --git a/src/cq/src/vnodeStream.c b/src/cq/src/vnodeStream.c new file mode 100644 index 0000000000..1a8611fdab --- /dev/null +++ b/src/cq/src/vnodeStream.c @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "taosmsg.h" +#include "vnode.h" +#include "vnodeUtil.h" +#include "vnodeStatus.h" + +/* static TAOS *dbConn = NULL; */ +void vnodeCloseStreamCallback(void *param); + +void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { + SMeterObj *pObj = (SMeterObj *)param; + dTrace("vid:%d sid:%d id:%s, stream result is ready", pObj->vnode, pObj->sid, pObj->meterId); + + // construct data + int32_t contLen = pObj->bytesPerPoint; + char * pTemp = calloc(1, sizeof(SSubmitMsg) + pObj->bytesPerPoint + sizeof(SVMsgHeader)); + SSubmitMsg *pMsg = (SSubmitMsg *)(pTemp + sizeof(SVMsgHeader)); + + pMsg->numOfRows = htons(1); + + char ncharBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; + + int32_t offset = 0; + for (int32_t i = 0; i < pObj->numOfColumns; ++i) { + char *dst = row[i]; + if (dst == NULL) { + setNull(pMsg->payLoad + offset, pObj->schema[i].type, pObj->schema[i].bytes); + } else { + // here, we need to transfer nchar(utf8) to unicode(ucs-4) + if (pObj->schema[i].type == TSDB_DATA_TYPE_NCHAR) { + taosMbsToUcs4(row[i], pObj->schema[i].bytes, ncharBuf, TSDB_MAX_BYTES_PER_ROW); + dst = ncharBuf; + } + + memcpy(pMsg->payLoad + offset, dst, pObj->schema[i].bytes); + } + + offset += pObj->schema[i].bytes; + } + + contLen += sizeof(SSubmitMsg); + + int32_t numOfPoints = 0; + int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, + &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); + + if (code != TSDB_CODE_SUCCESS) { + dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId); + } + + assert(numOfPoints >= 0 && numOfPoints <= 1); + tfree(pTemp); +} + +static void vnodeGetDBFromMeterId(SMeterObj *pObj, char *db) { + char *st = strstr(pObj->meterId, "."); + char *end = strstr(st + 1, "."); + + memcpy(db, st + 1, end - (st + 1)); +} + +void vnodeOpenStreams(void *param, void *tmrId) { + SVnodeObj *pVnode = (SVnodeObj *)param; + SMeterObj *pObj; + + if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; + if (pVnode->meterList == NULL) return; + + taosTmrStopA(&pVnode->streamTimer); + pVnode->streamTimer = NULL; + + for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { + pObj = pVnode->meterList[sid]; + if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue; + + dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql); + + if (pVnode->dbConn == NULL) { + char db[64] = {0}; + char user[64] = {0}; + vnodeGetDBFromMeterId(pObj, db); + sprintf(user, "_%s", pVnode->cfg.acct); + pVnode->dbConn = taos_connect(NULL, user, tsInternalPass, db, 0); + } + + if (pVnode->dbConn == NULL) { + dError("vid:%d, failed to connect to mgmt node", pVnode->vnode); + taosTmrReset(vnodeOpenStreams, 1000, param, vnodeTmrCtrl, &pVnode->streamTimer); + return; + } + + if (pObj->pStream == NULL) { + pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, + vnodeCloseStreamCallback); + if (pObj->pStream) pVnode->numOfStreams++; + } + } +} + +void vnodeCreateStream(SMeterObj *pObj) { + if (pObj->sqlLen <= 0) return; + + SVnodeObj *pVnode = vnodeList + pObj->vnode; + + if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; + if (pObj->pStream) return; + + dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql); + if (pVnode->dbConn == NULL) { + if (pVnode->streamTimer == NULL) taosTmrReset(vnodeOpenStreams, 1000, pVnode, vnodeTmrCtrl, &pVnode->streamTimer); + } else { + pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, + vnodeCloseStreamCallback); + if (pObj->pStream) pVnode->numOfStreams++; + } +} + +// Close only one stream +void vnodeRemoveStream(SMeterObj *pObj) { + SVnodeObj *pVnode = vnodeList + pObj->vnode; + if (pObj->sqlLen <= 0) return; + + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pVnode->numOfStreams--; + } + + pObj->pStream = NULL; + if (pVnode->numOfStreams == 0) { + taos_close(pVnode->dbConn); + pVnode->dbConn = NULL; + } + + dTrace("vid:%d sid:%d id:%d stream is removed", pObj->vnode, pObj->sid, pObj->meterId); +} + +// Close all streams in a vnode +void vnodeCloseStream(SVnodeObj *pVnode) { + SMeterObj *pObj; + dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); + + // stop stream computing + for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { + pObj = pVnode->meterList[sid]; + if (pObj == NULL) continue; + if (pObj->sqlLen > 0 && pObj->pStream) { + taos_close_stream(pObj->pStream); + pVnode->numOfStreams--; + } + pObj->pStream = NULL; + } +} + +void vnodeUpdateStreamRole(SVnodeObj *pVnode) { + /* SMeterObj *pObj; */ + + int newRole = (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) ? TSDB_VN_STREAM_STATUS_START : TSDB_VN_STREAM_STATUS_STOP; + if (newRole != pVnode->streamRole) { + dPrint("vid:%d, stream role is changed from %s to %s", + pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole), taosGetVnodeStreamStatusStr(newRole)); + pVnode->streamRole = newRole; + if (newRole == TSDB_VN_STREAM_STATUS_START) { + vnodeOpenStreams(pVnode, NULL); + } else { + vnodeCloseStream(pVnode); + } + } else { + dPrint("vid:%d, stream role is keep to %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); + } +} + +// Callback function called from client +void vnodeCloseStreamCallback(void *param) { + SMeterObj *pMeter = (SMeterObj *)param; + SVnodeObj *pVnode = NULL; + + if (pMeter == NULL || pMeter->sqlLen == 0) return; + pVnode = vnodeList + pMeter->vnode; + + pMeter->sqlLen = 0; + pMeter->pSql = NULL; + pMeter->pStream = NULL; + + pVnode->numOfStreams--; + + if (pVnode->numOfStreams == 0) { + taos_close(pVnode->dbConn); + pVnode->dbConn = NULL; + } + + vnodeSaveMeterObjToFile(pMeter); +} + + From 1317dbc53b68c15d5270297acb7194de5561e027 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 1 May 2020 16:33:06 +0800 Subject: [PATCH 02/11] fix part of issues reported by TScanCode. [TD-217] --- src/client/src/tscSecondaryMerge.c | 8 +++++++- src/client/src/tscServer.c | 8 ++++++-- src/client/src/tscStream.c | 4 ++-- src/client/src/tscUtil.c | 4 +++- src/dnode/src/dnodeMain.c | 1 + src/kit/shell/src/shellImport.c | 1 + src/kit/shell/src/shellLinux.c | 15 +++++++++++++-- src/kit/taosdemo/taosdemo.c | 3 +++ src/kit/taosdump/taosdump.c | 6 ++---- src/mnode/src/mgmtShell.c | 4 +++- src/mnode/src/mgmtVgroup.c | 6 +++++- src/query/src/qparserImpl.c | 4 ++-- src/query/src/qtsbuf.c | 10 +++++++--- src/query/src/queryUtil.c | 5 ++++- src/tsdb/src/tsdbRead.c | 2 +- 15 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 84f14abf4c..dbf17b56c5 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -140,7 +140,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd // offset of cmd in SSqlObj structure char *pSqlObjAddr = (char *)pCmd - offsetof(SSqlObj, cmd); - if (pMemBuffer == NULL || pDesc->pColumnModel == NULL) { + if (pMemBuffer == NULL) { + tscError("%p pMemBuffer", pMemBuffer); + pRes->code = TSDB_CODE_APP_ERROR; + return; + } + + if (pDesc->pColumnModel == NULL) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscError("%p no local buffer or intermediate result format model", pSqlObjAddr); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9b44bea82d..7874d0e816 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -228,7 +228,11 @@ int tscSendMsgToServer(SSqlObj *pSql) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; - if (pSql == NULL || pSql->signature != pSql) { + if (pSql == NULL) { + tscError("%p sql is already released", pSql->signature); + return; + } + if (pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); return; } @@ -310,7 +314,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { pRes->rspType = rpcMsg->msgType; pRes->rspLen = rpcMsg->contLen; - if (pRes->rspLen > 0) { + if (pRes->rspLen > 0 && rpcMsg->pCont) { char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); if (tmp == NULL) { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index d690681729..5f5af09cf8 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -172,17 +172,17 @@ static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) { static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) { SSqlStream * pStream = (SSqlStream *)param; SSqlObj * pSql = (SSqlObj *)res; - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); if (pSql == NULL || numOfRows < 0) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); - tscClearTableMetaInfo(pTableMetaInfo, true); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); return; } + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful. pStream->numOfRes += numOfRows; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c0cfa4d3af..506fa1a605 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -757,7 +757,9 @@ void tscCloseTscObj(STscObj* pObj) { taosTmrStopA(&(pObj->pTimer)); tscFreeSqlObj(pSql); - sem_destroy(&pSql->rspSem); + if (pSql) { + sem_destroy(&pSql->rspSem); + } rpcClose(pObj->pMgmtConn); pthread_mutex_destroy(&pObj->mutex); diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 6aa42f05c7..940b884927 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -213,6 +213,7 @@ static void dnodeCheckDataDirOpenned(char *dir) { int32_t ret = flock(fd, LOCK_EX | LOCK_NB); if (ret != 0) { dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret); + close(fd); exit(0); } } diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index 256b251075..e5c50bb74e 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -142,6 +142,7 @@ static void shellSourceFile(TAOS *con, char *fptr) { if (wordexp(fptr, &full_path, 0) != 0) { fprintf(stderr, "ERROR: illegal file name\n"); + free(cmd); return; } diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index d3453cda36..22ffa78c81 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -62,7 +62,13 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { if (arg) arguments->password = arg; break; case 'P': - tsMnodeShellPort = atoi(arg); + if (arg) { + tsMnodeShellPort = atoi(arg); + } else { + fprintf(stderr, "Invalid port\n"); + return -1; + } + break; case 't': arguments->timezone = arg; @@ -101,7 +107,12 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { wordfree(&full_path); break; case 'T': - arguments->threadNum = atoi(arg); + if (arg) { + arguments->threadNum = atoi(arg); + } else { + fprintf(stderr, "Invalid number of threads\n"); + return -1; + } break; case 'd': arguments->database = arg; diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 24855ab8b5..937c8d177d 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -340,6 +340,9 @@ int main(int argc, char *argv[]) { int count_data_type = 0; char dataString[512]; bool do_aggreFunc = true; + + memset(dataString, 0, 512); + if (strcasecmp(data_type[0], "BINARY") == 0 || strcasecmp(data_type[0], "BOOL") == 0) { do_aggreFunc = false; } diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 2e64c9bccc..ed98a9b92c 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -383,14 +383,13 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) { TAOS_FIELD *fields = taos_fetch_fields(result); - while ((row = taos_fetch_row(result)) != NULL) { + if ((row = taos_fetch_row(result)) != NULL) { isSet = true; pTableRecordInfo->isMetric = false; strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes); strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes); - break; } taos_free_result(result); @@ -410,11 +409,10 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) { return -1; } - while ((row = taos_fetch_row(result)) != NULL) { + if ((row = taos_fetch_row(result)) != NULL) { isSet = true; pTableRecordInfo->isMetric = true; strcpy(pTableRecordInfo->tableRecord.metric, table); - break; } taos_free_result(result); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 193521b026..05cacbb95c 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -149,7 +149,9 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { } static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { - if (rpcMsg == NULL || rpcMsg->pCont == NULL) { + assert(rpcMsg); + + if (rpcMsg->pCont == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); return; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 839dce5c38..b3260850ad 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -158,7 +158,11 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) { } mgmtDecVgroupRef(pVgroup); - mTrace("vgId:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes); + + mTrace("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes); + if (pDb) { + mTrace("tables:%d", pDb->cfg.maxTables); + } return TSDB_CODE_SUCCESS; } diff --git a/src/query/src/qparserImpl.c b/src/query/src/qparserImpl.c index 075dbc9d14..7b3a76c9d2 100644 --- a/src/query/src/qparserImpl.c +++ b/src/query/src/qparserImpl.c @@ -675,7 +675,7 @@ void SQLInfoDestroy(SSqlInfo *pInfo) { free(pInfo->pDCLInfo->a); } - if (pInfo->type == TSDB_SQL_CREATE_DB) { + if (pInfo->pDCLInfo != NULL && pInfo->type == TSDB_SQL_CREATE_DB) { tVariantListDestroy(pInfo->pDCLInfo->dbOpt.keep); } @@ -899,4 +899,4 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) { pDBInfo->keep = NULL; memset(&pDBInfo->precision, 0, sizeof(SSQLToken)); -} \ No newline at end of file +} diff --git a/src/query/src/qtsbuf.c b/src/query/src/qtsbuf.c index 062a8038b2..1d5c4f2d9d 100644 --- a/src/query/src/qtsbuf.c +++ b/src/query/src/qtsbuf.c @@ -636,12 +636,16 @@ void tsBufResetPos(STSBuf* pTSBuf) { STSElem tsBufGetElem(STSBuf* pTSBuf) { STSElem elem1 = {.vnode = -1}; - STSCursor* pCur = &pTSBuf->cur; - if (pTSBuf == NULL || pCur->vnodeIndex < 0) { + if (pTSBuf == NULL) { return elem1; } + STSCursor* pCur = &pTSBuf->cur; + if (pCur != NULL && pCur->vnodeIndex < 0) { + return elem1; + } + STSBlock* pBlock = &pTSBuf->block; elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode; @@ -920,4 +924,4 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { pTSBuf->fileSize += getDataStartOffset(); return pTSBuf; -} \ No newline at end of file +} diff --git a/src/query/src/queryUtil.c b/src/query/src/queryUtil.c index b4d8911284..9da02f9f0f 100644 --- a/src/query/src/queryUtil.c +++ b/src/query/src/queryUtil.c @@ -62,7 +62,10 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) { } void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) { - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { + if (pWindowResInfo == NULL) { + return; + } + if (pWindowResInfo->capacity == 0) { assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL); return; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index bc9220dbc7..46480b2b9d 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -216,7 +216,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { return false; } - if (pCheckInfo->iter == NULL) { + if (pCheckInfo->iter == NULL && pTable->mem) { pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, pHandle->order); From d927985e581540ce9873cf586eae0bd286957a01 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 1 May 2020 16:53:53 +0800 Subject: [PATCH 03/11] fix test script grep to match accuretely. --- tests/test-all.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test-all.sh b/tests/test-all.sh index ee1904ba7c..4bffca1201 100755 --- a/tests/test-all.sh +++ b/tests/test-all.sh @@ -10,7 +10,7 @@ NC='\033[0m' cd script ./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | tee out.txt -totalSuccess=`grep success out.txt | wc -l` +totalSuccess=`grep -w 'success' out.txt | wc -l` totalBasic=`grep success out.txt | grep Suite | wc -l` if [ "$totalSuccess" -gt "0" ]; then @@ -18,7 +18,7 @@ if [ "$totalSuccess" -gt "0" ]; then echo -e "${GREEN} ### Total $totalSuccess TSIM case(s) succeed! ### ${NC}" fi -totalFailed=`grep 'failed\|fault' out.txt | wc -l` +totalFailed=`grep -w 'failed\|fault' out.txt | wc -l` if [ "$totalFailed" -ne "0" ]; then echo -e "${RED} ### Total $totalFailed TSIM case(s) failed! ### ${NC}" exit $totalFailed From 4191754f247885b85222fad2262b6466a2bc85f8 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 1 May 2020 18:15:38 +0800 Subject: [PATCH 04/11] fix test.py to support steven's case --- tests/pytest/smoketest.sh | 12 ++++----- tests/pytest/test.py | 57 +++++++++++++++++++-------------------- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/tests/pytest/smoketest.sh b/tests/pytest/smoketest.sh index af597fb6c5..7dbefa9402 100755 --- a/tests/pytest/smoketest.sh +++ b/tests/pytest/smoketest.sh @@ -34,12 +34,12 @@ python3 ./test.py $1 -f table/db_table.py python3 ./test.py -s $1 sleep 1 -python3 ./test.py $1 -f import_merge/importDataLastTO.py -python3 ./test.py -s $1 -sleep 1 -python3 ./test.py $1 -f import_merge/importDataLastT.py -python3 ./test.py -s $1 -sleep 1 +#python3 ./test.py $1 -f import_merge/importDataLastTO.py +#python3 ./test.py -s $1 +#sleep 1 +#python3 ./test.py $1 -f import_merge/importDataLastT.py +#python3 ./test.py -s $1 +#sleep 1 python3 ./test.py $1 -f import_merge/importDataTO.py python3 ./test.py -s $1 sleep 1 diff --git a/tests/pytest/test.py b/tests/pytest/test.py index 479406a00b..9bf1660634 100644 --- a/tests/pytest/test.py +++ b/tests/pytest/test.py @@ -83,36 +83,35 @@ if __name__ == "__main__": tdLog.exit('stop All dnodes') - if masterIp == "": - tdDnodes.init(deployPath) - tdDnodes.setTestCluster(testCluster) - tdDnodes.setValgrind(valgrind) + tdDnodes.init(deployPath) + tdDnodes.setTestCluster(testCluster) + tdDnodes.setValgrind(valgrind) - if testCluster: - tdLog.notice("Procedures for testing cluster") - if fileName == "all": - tdCases.runAllCluster() - else: - tdCases.runOneCluster(fileName) - else: - tdLog.notice("Procedures for testing self-deployment") - tdDnodes.stopAll() - tdDnodes.deploy(1) - tdDnodes.start(1) - conn = taos.connect( - host='127.0.0.1', - config=tdDnodes.getSimCfgPath()) - if fileName == "all": - tdCases.runAllLinux(conn) - else: - tdCases.runOneLinux(conn, fileName) - conn.close() + tdDnodes.stopAll() + tdDnodes.deploy(1) + tdDnodes.start(1) + + if masterIp == "": + host='127.0.0.1' else: - tdLog.notice("Procedures for tdengine deployed in %s" % (masterIp)) - cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath() - conn = taos.connect(host=masterIp, config=cfgPath) + host=masterIp + + tdLog.notice("Procedures for tdengine deployed in %s" % (host)) + + if testCluster: + tdLog.notice("Procedures for testing cluster") if fileName == "all": - tdCases.runAllWindows(conn) + tdCases.runAllCluster() else: - tdCases.runOneWindows(conn, fileName) - conn.close() + tdCases.runOneCluster(fileName) + else: + tdLog.notice("Procedures for testing self-deployment") + conn = taos.connect( + host, + config=tdDnodes.getSimCfgPath()) + if fileName == "all": + tdCases.runAllLinux(conn) + else: + tdCases.runOneLinux(conn, fileName) + + conn.close() From 5ac8522d663f1522452b6e24d5f1ab38aa992c93 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Fri, 1 May 2020 23:06:59 +0800 Subject: [PATCH 05/11] TD-166 --- src/vnode/src/vnodeWrite.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index b1a49e6e65..a88737bc39 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -141,7 +141,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); for (int i = 0; i < numOfTags; i++) { - STColumn *pTCol = schemaColAt(pDestSchema, i); + STColumn *pTCol = schemaColAt(pDestTagSchema, i); tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); accumBytes += htons(pSchema[i + numOfColumns].bytes); } From cdcb0daa8a80a53fb308f805de0cb376fb9155fd Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Sat, 2 May 2020 00:19:49 +0000 Subject: [PATCH 06/11] first draft --- src/CMakeLists.txt | 1 + src/cq/CMakeLists.txt | 15 ++ src/cq/src/cqMain.c | 328 +++++++++++++++++++++++++------------ src/cq/src/vnodeStream.c | 209 ----------------------- src/cq/test/CMakeLists.txt | 17 ++ src/cq/test/cqtest.c | 94 +++++++++++ src/inc/taosdef.h | 5 + src/inc/tcq.h | 47 ++++++ src/util/inc/tqueue.h | 4 - src/vnode/CMakeLists.txt | 4 +- src/vnode/src/vnodeMain.c | 59 ++++--- src/vnode/src/vnodeWrite.c | 10 ++ 12 files changed, 450 insertions(+), 343 deletions(-) create mode 100644 src/cq/CMakeLists.txt delete mode 100644 src/cq/src/vnodeStream.c create mode 100644 src/cq/test/CMakeLists.txt create mode 100644 src/cq/test/cqtest.c create mode 100644 src/inc/tcq.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 904050fb1b..57d7234379 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,5 +14,6 @@ ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(vnode) ADD_SUBDIRECTORY(tsdb) ADD_SUBDIRECTORY(wal) +ADD_SUBDIRECTORY(cq) ADD_SUBDIRECTORY(dnode) ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt new file mode 100644 index 0000000000..d41ae09a58 --- /dev/null +++ b/src/cq/CMakeLists.txt @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(inc) + +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) + +ADD_LIBRARY(tcq ${SRC}) +TARGET_LINK_LIBRARIES(tcq tutil common taos) + +ADD_SUBDIRECTORY(test) + diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index e5bbf8353a..e3df73a883 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -14,154 +14,266 @@ */ #define _DEFAULT_SOURCE + +#include +#include +#include +#include "taosdef.h" #include "taosmsg.h" -#include "vnode.h" +#include "tlog.h" +#include "twal.h" +#include "tcq.h" +#include "taos.h" -/* static TAOS *dbConn = NULL; */ -void vnodeCloseStreamCallback(void *param); +#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);} +#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);} +#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);} +#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);} -void cqOpen(void *param, void *tmrId) { - SVnodeObj *pVnode = (SVnodeObj *)param; - SMeterObj *pObj; +typedef struct { + int vgId; + char path[TSDB_FILENAME_LEN]; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + FCqWrite cqWrite; + void *ahandle; + int num; // number of continuous streams + struct SCqObj *pHead; + void *dbConn; + pthread_mutex_t mutex; +} SCqContext; - if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; - if (pVnode->meterList == NULL) return; +typedef struct SCqObj { + int sid; // table ID + int rowSize; // bytes of a row + char *sqlStr; // SQL string + int columns; // number of columns + SSchema *pSchema; // pointer to schema array + void *pStream; + struct SCqObj *next; + SCqContext *pContext; +} SCqObj; - taosTmrStopA(&pVnode->streamTimer); - pVnode->streamTimer = NULL; +int cqDebugFlag = 135; - for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { - pObj = pVnode->meterList[sid]; - if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue; +static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); - dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql); +void *cqOpen(void *ahandle, const SCqCfg *pCfg) { + + SCqContext *pContext = calloc(sizeof(SCqContext), 1); + if (pContext == NULL) return NULL; - if (pVnode->dbConn == NULL) { - char db[64] = {0}; - char user[64] = {0}; - vnodeGetDBFromMeterId(pObj, db); - sprintf(user, "_%s", pVnode->cfg.acct); - pVnode->dbConn = taos_connect(NULL, user, tsInternalPass, db, 0); + strcpy(pContext->user, pCfg->user); + strcpy(pContext->pass, pCfg->pass); + strcpy(pContext->path, pCfg->path); + pContext->vgId = pCfg->vgId; + pContext->cqWrite = pCfg->cqWrite; + pContext->ahandle = ahandle; + + // open meta data file + + // loop each record + while (1) { + SCqObj *pObj = calloc(sizeof(SCqObj), 1); + if (pObj == NULL) { + cError("vgId:%d, no memory", pContext->vgId); + continue; } - if (pVnode->dbConn == NULL) { - dError("vid:%d, failed to connect to mgmt node", pVnode->vnode); - taosTmrReset(vnodeOpenStreams, 1000, param, vnodeTmrCtrl, &pVnode->streamTimer); - return; - } + pObj->next = pContext->pHead; + pContext->pHead = pObj; - if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, - vnodeCloseStreamCallback); - if (pObj->pStream) pVnode->numOfStreams++; - } + // assigne each field in SCqObj + // pObj->sid = + // strcpy(pObj->sqlStr, ?? ); + // schema, columns } + + pthread_mutex_init(&pContext->mutex, NULL); + + cTrace("vgId:%d, CQ is opened", pContext->vgId); + + return pContext; } -// Close all streams in a vnode -void cqClose(SVnodeObj *pVnode) { - SMeterObj *pObj; - dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); +void cqClose(void *handle) { + SCqContext *pContext = handle; - // stop stream computing - for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { - pObj = pVnode->meterList[sid]; - if (pObj == NULL) continue; - if (pObj->sqlLen > 0 && pObj->pStream) { - taos_close_stream(pObj->pStream); - pVnode->numOfStreams--; + // stop all CQs + cqStop(pContext); + + // save the meta data + + // free all resources + SCqObj *pObj = pContext->pHead; + while (pObj) { + SCqObj *pTemp = pObj; + pObj = pObj->next; + free(pTemp); + } + + pthread_mutex_destroy(&pContext->mutex); + + cTrace("vgId:%d, CQ is closed", pContext->vgId); + free(pContext); +} + +void cqStart(void *handle) { + SCqContext *pContext = handle; + cTrace("vgId:%d, start all CQs", pContext->vgId); + if (pContext->dbConn) return; + + pthread_mutex_lock(&pContext->mutex); + + pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); + if (pContext->dbConn) { + cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); + pthread_mutex_unlock(&pContext->mutex); + return; + } + + + SCqObj *pObj = pContext->pHead; + while (pObj) { + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->sqlStr); } + pObj = pObj->next; + } + + pthread_mutex_unlock(&pContext->mutex); +} + +void cqStop(void *handle) { + SCqContext *pContext = handle; + cTrace("vgId:%d, stop all CQs", pContext->vgId); + if (pContext->dbConn == NULL) return; + + pthread_mutex_lock(&pContext->mutex); + + SCqObj *pObj = pContext->pHead; + while (pObj) { + if (pObj->pStream) taos_close_stream(pObj->pStream); pObj->pStream = NULL; + cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->sid, pObj->sqlStr); + + pObj = pObj->next; } + + if (pContext->dbConn) taos_close(pContext->dbConn); + pContext->dbConn = NULL; + + pthread_mutex_unlock(&pContext->mutex); } -void cqCreate(SMeterObj *pObj) { - if (pObj->sqlLen <= 0) return; +void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns) { + SCqContext *pContext = handle; - SVnodeObj *pVnode = vnodeList + pObj->vnode; + SCqObj *pObj = calloc(sizeof(SCqObj), 1); + if (pObj == NULL) return; - if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; - if (pObj->pStream) return; + pObj->sid = sid; + pObj->sqlStr = malloc(strlen(sqlStr)+1); + strcpy(pObj->sqlStr, sqlStr); - dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql); - if (pVnode->dbConn == NULL) { - if (pVnode->streamTimer == NULL) taosTmrReset(vnodeOpenStreams, 1000, pVnode, vnodeTmrCtrl, &pVnode->streamTimer); - } else { - pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, - vnodeCloseStreamCallback); - if (pObj->pStream) pVnode->numOfStreams++; + pObj->columns = columns; + + int size = sizeof(SSchema) * columns; + pObj->pSchema = malloc(size); + memcpy(pObj->pSchema, pSchema, size); + + cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->sid, pObj->sqlStr); + + pthread_mutex_lock(&pContext->mutex); + + pObj->next = pContext->pHead; + pContext->pHead = pObj; + + if (pContext->dbConn) { + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->sid, pObj->sqlStr); + } } + + pthread_mutex_unlock(&pContext->mutex); } -// Close only one stream -void cqDrop(SMeterObj *pObj) { - SVnodeObj *pVnode = vnodeList + pObj->vnode; - if (pObj->sqlLen <= 0) return; +void cqDrop(void *handle, int sid) { + SCqContext *pContext = handle; - if (pObj->pStream) { - taos_close_stream(pObj->pStream); - pVnode->numOfStreams--; + pthread_mutex_lock(&pContext->mutex); + + // locate the pObj; + SCqObj *prev = NULL; + SCqObj *pObj = pContext->pHead; + while (pObj) { + if (pObj->sid != sid) { + prev = pObj; + pObj = pObj->next; + continue; + } + + // remove from the linked list + if (prev) { + prev->next = pObj->next; + } else { + pContext->pHead = pObj->next; + } + + break; } - pObj->pStream = NULL; - if (pVnode->numOfStreams == 0) { - taos_close(pVnode->dbConn); - pVnode->dbConn = NULL; + if (pObj) { + // update the meta data + + // free the resources associated + if (pObj->pStream) taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + + cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->sid, pObj->sqlStr); + free(pObj); } - dTrace("vid:%d sid:%d id:%d stream is removed", pObj->vnode, pObj->sid, pObj->meterId); + pthread_mutex_lock(&pContext->mutex); } -void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { - SMeterObj *pObj = (SMeterObj *)param; - dTrace("vid:%d sid:%d id:%s, stream result is ready", pObj->vnode, pObj->sid, pObj->meterId); +static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { + SCqObj *pObj = (SCqObj *)param; + SCqContext *pContext = pObj->pContext; + if (pObj->pStream == NULL) return; + + cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->sid, pObj->sqlStr); // construct data - int32_t contLen = pObj->bytesPerPoint; - char * pTemp = calloc(1, sizeof(SSubmitMsg) + pObj->bytesPerPoint + sizeof(SVMsgHeader)); - SSubmitMsg *pMsg = (SSubmitMsg *)(pTemp + sizeof(SVMsgHeader)); + int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize; + char *buffer = calloc(size, 1); - pMsg->numOfRows = htons(1); + SWalHead *pHead = (SWalHead *)buffer; + pHead->msgType = TSDB_MSG_TYPE_SUBMIT; + pHead->len = size - sizeof(SWalHead); + + SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead)); + // to do: fill in the SSubmitMsg structure + pSubmit->numOfBlocks = 1; - char ncharBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; - int32_t offset = 0; - for (int32_t i = 0; i < pObj->numOfColumns; ++i) { - char *dst = row[i]; - if (dst == NULL) { - setNull(pMsg->payLoad + offset, pObj->schema[i].type, pObj->schema[i].bytes); - } else { - // here, we need to transfer nchar(utf8) to unicode(ucs-4) - if (pObj->schema[i].type == TSDB_DATA_TYPE_NCHAR) { - taosMbsToUcs4(row[i], pObj->schema[i].bytes, ncharBuf, TSDB_MAX_BYTES_PER_ROW); - dst = ncharBuf; - } + SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); + // to do: fill in the SSubmitBlk strucuture + pBlk->tid = pObj->sid; - memcpy(pMsg->payLoad + offset, dst, pObj->schema[i].bytes); - } - offset += pObj->schema[i].bytes; - } + // write into vnode write queue + pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); - contLen += sizeof(SSubmitMsg); - - int32_t numOfPoints = 0; - int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, - &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); - - if (code != TSDB_CODE_SUCCESS) { - dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId); - } - - assert(numOfPoints >= 0 && numOfPoints <= 1); - tfree(pTemp); } -static void vnodeGetDBFromMeterId(SMeterObj *pObj, char *db) { - char *st = strstr(pObj->meterId, "."); - char *end = strstr(st + 1, "."); - - memcpy(db, st + 1, end - (st + 1)); -} - - diff --git a/src/cq/src/vnodeStream.c b/src/cq/src/vnodeStream.c deleted file mode 100644 index 1a8611fdab..0000000000 --- a/src/cq/src/vnodeStream.c +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "taosmsg.h" -#include "vnode.h" -#include "vnodeUtil.h" -#include "vnodeStatus.h" - -/* static TAOS *dbConn = NULL; */ -void vnodeCloseStreamCallback(void *param); - -void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { - SMeterObj *pObj = (SMeterObj *)param; - dTrace("vid:%d sid:%d id:%s, stream result is ready", pObj->vnode, pObj->sid, pObj->meterId); - - // construct data - int32_t contLen = pObj->bytesPerPoint; - char * pTemp = calloc(1, sizeof(SSubmitMsg) + pObj->bytesPerPoint + sizeof(SVMsgHeader)); - SSubmitMsg *pMsg = (SSubmitMsg *)(pTemp + sizeof(SVMsgHeader)); - - pMsg->numOfRows = htons(1); - - char ncharBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; - - int32_t offset = 0; - for (int32_t i = 0; i < pObj->numOfColumns; ++i) { - char *dst = row[i]; - if (dst == NULL) { - setNull(pMsg->payLoad + offset, pObj->schema[i].type, pObj->schema[i].bytes); - } else { - // here, we need to transfer nchar(utf8) to unicode(ucs-4) - if (pObj->schema[i].type == TSDB_DATA_TYPE_NCHAR) { - taosMbsToUcs4(row[i], pObj->schema[i].bytes, ncharBuf, TSDB_MAX_BYTES_PER_ROW); - dst = ncharBuf; - } - - memcpy(pMsg->payLoad + offset, dst, pObj->schema[i].bytes); - } - - offset += pObj->schema[i].bytes; - } - - contLen += sizeof(SSubmitMsg); - - int32_t numOfPoints = 0; - int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, - &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); - - if (code != TSDB_CODE_SUCCESS) { - dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId); - } - - assert(numOfPoints >= 0 && numOfPoints <= 1); - tfree(pTemp); -} - -static void vnodeGetDBFromMeterId(SMeterObj *pObj, char *db) { - char *st = strstr(pObj->meterId, "."); - char *end = strstr(st + 1, "."); - - memcpy(db, st + 1, end - (st + 1)); -} - -void vnodeOpenStreams(void *param, void *tmrId) { - SVnodeObj *pVnode = (SVnodeObj *)param; - SMeterObj *pObj; - - if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; - if (pVnode->meterList == NULL) return; - - taosTmrStopA(&pVnode->streamTimer); - pVnode->streamTimer = NULL; - - for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { - pObj = pVnode->meterList[sid]; - if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue; - - dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql); - - if (pVnode->dbConn == NULL) { - char db[64] = {0}; - char user[64] = {0}; - vnodeGetDBFromMeterId(pObj, db); - sprintf(user, "_%s", pVnode->cfg.acct); - pVnode->dbConn = taos_connect(NULL, user, tsInternalPass, db, 0); - } - - if (pVnode->dbConn == NULL) { - dError("vid:%d, failed to connect to mgmt node", pVnode->vnode); - taosTmrReset(vnodeOpenStreams, 1000, param, vnodeTmrCtrl, &pVnode->streamTimer); - return; - } - - if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, - vnodeCloseStreamCallback); - if (pObj->pStream) pVnode->numOfStreams++; - } - } -} - -void vnodeCreateStream(SMeterObj *pObj) { - if (pObj->sqlLen <= 0) return; - - SVnodeObj *pVnode = vnodeList + pObj->vnode; - - if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; - if (pObj->pStream) return; - - dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql); - if (pVnode->dbConn == NULL) { - if (pVnode->streamTimer == NULL) taosTmrReset(vnodeOpenStreams, 1000, pVnode, vnodeTmrCtrl, &pVnode->streamTimer); - } else { - pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, - vnodeCloseStreamCallback); - if (pObj->pStream) pVnode->numOfStreams++; - } -} - -// Close only one stream -void vnodeRemoveStream(SMeterObj *pObj) { - SVnodeObj *pVnode = vnodeList + pObj->vnode; - if (pObj->sqlLen <= 0) return; - - if (pObj->pStream) { - taos_close_stream(pObj->pStream); - pVnode->numOfStreams--; - } - - pObj->pStream = NULL; - if (pVnode->numOfStreams == 0) { - taos_close(pVnode->dbConn); - pVnode->dbConn = NULL; - } - - dTrace("vid:%d sid:%d id:%d stream is removed", pObj->vnode, pObj->sid, pObj->meterId); -} - -// Close all streams in a vnode -void vnodeCloseStream(SVnodeObj *pVnode) { - SMeterObj *pObj; - dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); - - // stop stream computing - for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { - pObj = pVnode->meterList[sid]; - if (pObj == NULL) continue; - if (pObj->sqlLen > 0 && pObj->pStream) { - taos_close_stream(pObj->pStream); - pVnode->numOfStreams--; - } - pObj->pStream = NULL; - } -} - -void vnodeUpdateStreamRole(SVnodeObj *pVnode) { - /* SMeterObj *pObj; */ - - int newRole = (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) ? TSDB_VN_STREAM_STATUS_START : TSDB_VN_STREAM_STATUS_STOP; - if (newRole != pVnode->streamRole) { - dPrint("vid:%d, stream role is changed from %s to %s", - pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole), taosGetVnodeStreamStatusStr(newRole)); - pVnode->streamRole = newRole; - if (newRole == TSDB_VN_STREAM_STATUS_START) { - vnodeOpenStreams(pVnode, NULL); - } else { - vnodeCloseStream(pVnode); - } - } else { - dPrint("vid:%d, stream role is keep to %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); - } -} - -// Callback function called from client -void vnodeCloseStreamCallback(void *param) { - SMeterObj *pMeter = (SMeterObj *)param; - SVnodeObj *pVnode = NULL; - - if (pMeter == NULL || pMeter->sqlLen == 0) return; - pVnode = vnodeList + pMeter->vnode; - - pMeter->sqlLen = 0; - pMeter->pSql = NULL; - pMeter->pStream = NULL; - - pVnode->numOfStreams--; - - if (pVnode->numOfStreams == 0) { - taos_close(pVnode->dbConn); - pVnode->dbConn = NULL; - } - - vnodeSaveMeterObjToFile(pMeter); -} - - diff --git a/src/cq/test/CMakeLists.txt b/src/cq/test/CMakeLists.txt new file mode 100644 index 0000000000..99c729dff4 --- /dev/null +++ b/src/cq/test/CMakeLists.txt @@ -0,0 +1,17 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND CQTEST_SRC ./cqtest.c) + ADD_EXECUTABLE(cqtest ${CQTEST_SRC}) + TARGET_LINK_LIBRARIES(cqtest tcq) + +ENDIF () + + diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c new file mode 100644 index 0000000000..b0c6ca3178 --- /dev/null +++ b/src/cq/test/cqtest.c @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "taosdef.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "tlog.h" +#include "tcq.h" + +int64_t ver = 0; +void *pCq = NULL; + +int writeToQueue(void *pVnode, void *data, int type) { + return 0; +} + +int main(int argc, char *argv[]) { + char path[128] = "~/cq"; + + for (int i=1; i + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_CQ_H_ +#define _TD_CQ_H_ + +#ifdef __cplusplus +extern "C" { +#endif + + +typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); + +typedef struct { + int vgId; + char path[TSDB_FILENAME_LEN]; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + FCqWrite cqWrite; +} SCqCfg; + +void *cqOpen(void *ahandle, const SCqCfg *pCfg); +void cqClose(void *handle); +void cqStart(void *handle); +void cqStop(void *handle); +void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns); +void cqDrop(void *handle, int sid); + +extern int cqDebugFlag; + + +#ifdef __cplusplus +} +#endif + +#endif // _TD_CQ_H_ diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index c45eb10518..f4086dcd12 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -20,10 +20,6 @@ extern "C" { #endif -#define TAOS_QTYPE_RPC 0 -#define TAOS_QTYPE_FWD 1 -#define TAOS_QTYPE_WAL 2 - typedef void* taos_queue; typedef void* taos_qset; typedef void* taos_qall; diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 6ceb83cb45..a1c56b32b5 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(vnode ${SRC}) - TARGET_LINK_LIBRARIES(vnode tsdb) -ENDIF () \ No newline at end of file + TARGET_LINK_LIBRARIES(vnode tsdb tcq) +ENDIF () diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 1302ceaff4..3ee0083cb3 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -30,6 +30,8 @@ #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" +#include "tcq.h" +//#include "tsync.h" static int32_t tsOpennedVnodes; static void *tsDnodeVnodesHash; @@ -192,8 +194,27 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + STsdbAppH appH = {0}; + appH.appH = (void *)pVnode; + appH.walCallBack = vnodeWalCallback; + + sprintf(temp, "%s/tsdb", rootDir); + pVnode->tsdb = tsdbOpenRepo(temp, &appH); + if (pVnode->tsdb == NULL) { + dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return terrno; + } + sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); + + SCqCfg cqCfg; + sprintf(cqCfg.path, "%s/cq", rootDir); + strcpy(cqCfg.pass, tsInternalPass); + cqCfg.cqWrite = vnodeWriteToQueue; + pVnode->cq = cqOpen(pVnode, &cqCfg); SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; @@ -208,24 +229,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyRole = vnodeNotifyRole; pVnode->sync = syncStart(&syncInfo); + // start continuous query + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + cqStart(pVnode->cq); + pVnode->events = NULL; - pVnode->cq = NULL; - - STsdbAppH appH = {0}; - appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWalCallback; - - sprintf(temp, "%s/tsdb", rootDir); - void *pTsdb = tsdbOpenRepo(temp, &appH); - if (pTsdb == NULL) { - dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; - } - - pVnode->tsdb = pTsdb; - - walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); pVnode->status = TAOS_VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); @@ -350,10 +358,16 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { pVnode->sync = NULL; } - tsdbCloseRepo(pVnode->tsdb); - walClose(pVnode->wal); - vnodeSaveVersion(pVnode); + cqClose(pVnode->cq); + pVnode->cq = NULL; + tsdbCloseRepo(pVnode->tsdb); + pVnode->tsdb = NULL; + + walClose(pVnode->wal); + pVnode->wal = NULL; + + vnodeSaveVersion(pVnode); vnodeRelease(pVnode); } @@ -377,6 +391,11 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; pVnode->role = role; + + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + cqStart(pVnode->cq); + else + cqStop(pVnode->cq); } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index b1a49e6e65..a176468e85 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -26,6 +26,7 @@ #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" +#include "tcq.h" static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); @@ -113,6 +114,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfTags = htons(pTable->numOfTags); int32_t sid = htonl(pTable->sid); + int32_t sqlDataLen = htonl(pTable->sqlDataLen); uint64_t uid = htobe64(pTable->uid); SSchema *pSchema = (SSchema *) pTable->data; @@ -150,6 +152,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe code = tsdbCreateTable(pVnode->tsdb, &tCfg); + if (code == 0 && sqlDataLen >0) { + char *sqlStr = NULL; + // to do: get the sqlStr + + cqCreate(pVnode->cq, sid, sqlStr, pSchema, numOfColumns); + } + tfree(pDestSchema); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code); @@ -167,6 +176,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet }; code = tsdbDropTable(pVnode->tsdb, tableId); + cqDrop(pVnode->cq, tableId.tid); return code; } From 27fab2cfe69a2142e1c859bab8345170116f08aa Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Sat, 2 May 2020 02:36:46 +0000 Subject: [PATCH 07/11] pass the unit testing --- src/cq/CMakeLists.txt | 1 + src/cq/src/cqMain.c | 108 ++++++++++++++----------------------- src/cq/test/cqtest.c | 29 +++++++--- src/inc/tcq.h | 14 +++-- src/inc/tsdb.h | 2 +- src/vnode/src/vnodeMain.c | 13 ++--- src/vnode/src/vnodeWrite.c | 10 ---- 7 files changed, 80 insertions(+), 97 deletions(-) diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt index d41ae09a58..e8796306f3 100644 --- a/src/cq/CMakeLists.txt +++ b/src/cq/CMakeLists.txt @@ -4,6 +4,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index e3df73a883..62b9a41494 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -20,6 +20,7 @@ #include #include "taosdef.h" #include "taosmsg.h" +#include "tglobal.h" #include "tlog.h" #include "twal.h" #include "tcq.h" @@ -32,7 +33,6 @@ typedef struct { int vgId; - char path[TSDB_FILENAME_LEN]; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; FCqWrite cqWrite; @@ -44,12 +44,13 @@ typedef struct { } SCqContext; typedef struct SCqObj { - int sid; // table ID + int tid; // table ID int rowSize; // bytes of a row char *sqlStr; // SQL string int columns; // number of columns SSchema *pSchema; // pointer to schema array void *pStream; + struct SCqObj *prev; struct SCqObj *next; SCqContext *pContext; } SCqObj; @@ -65,30 +66,10 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { strcpy(pContext->user, pCfg->user); strcpy(pContext->pass, pCfg->pass); - strcpy(pContext->path, pCfg->path); pContext->vgId = pCfg->vgId; pContext->cqWrite = pCfg->cqWrite; pContext->ahandle = ahandle; - // open meta data file - - // loop each record - while (1) { - SCqObj *pObj = calloc(sizeof(SCqObj), 1); - if (pObj == NULL) { - cError("vgId:%d, no memory", pContext->vgId); - continue; - } - - pObj->next = pContext->pHead; - pContext->pHead = pObj; - - // assigne each field in SCqObj - // pObj->sid = - // strcpy(pObj->sqlStr, ?? ); - // schema, columns - } - pthread_mutex_init(&pContext->mutex, NULL); cTrace("vgId:%d, CQ is opened", pContext->vgId); @@ -102,8 +83,6 @@ void cqClose(void *handle) { // stop all CQs cqStop(pContext); - // save the meta data - // free all resources SCqObj *pObj = pContext->pHead; while (pObj) { @@ -125,23 +104,23 @@ void cqStart(void *handle) { pthread_mutex_lock(&pContext->mutex); + tscEmbedded = 1; pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); - if (pContext->dbConn) { + if (pContext->dbConn == NULL) { cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); pthread_mutex_unlock(&pContext->mutex); return; } - SCqObj *pObj = pContext->pHead; while (pObj) { int64_t lastKey = 0; pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); if (pObj->pStream) { pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr); + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); } else { - cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->sqlStr); + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); } pObj = pObj->next; } @@ -158,9 +137,11 @@ void cqStop(void *handle) { SCqObj *pObj = pContext->pHead; while (pObj) { - if (pObj->pStream) taos_close_stream(pObj->pStream); - pObj->pStream = NULL; - cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->sid, pObj->sqlStr); + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); + } pObj = pObj->next; } @@ -171,13 +152,13 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns) { +void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) { SCqContext *pContext = handle; SCqObj *pObj = calloc(sizeof(SCqObj), 1); - if (pObj == NULL) return; + if (pObj == NULL) return NULL; - pObj->sid = sid; + pObj->tid = tid; pObj->sqlStr = malloc(strlen(sqlStr)+1); strcpy(pObj->sqlStr, sqlStr); @@ -187,11 +168,12 @@ void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns pObj->pSchema = malloc(size); memcpy(pObj->pSchema, pSchema, size); - cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->sid, pObj->sqlStr); + cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); pthread_mutex_lock(&pContext->mutex); pObj->next = pContext->pHead; + if (pContext->pHead) pContext->pHead->prev = pObj; pContext->pHead = pObj; if (pContext->dbConn) { @@ -199,51 +181,40 @@ void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); if (pObj->pStream) { pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr); + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); } else { - cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->sid, pObj->sqlStr); + cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr); } } pthread_mutex_unlock(&pContext->mutex); + + return pObj; } -void cqDrop(void *handle, int sid) { - SCqContext *pContext = handle; +void cqDrop(void *handle) { + SCqObj *pObj = handle; + SCqContext *pContext = pObj->pContext; pthread_mutex_lock(&pContext->mutex); - // locate the pObj; - SCqObj *prev = NULL; - SCqObj *pObj = pContext->pHead; - while (pObj) { - if (pObj->sid != sid) { - prev = pObj; - pObj = pObj->next; - continue; - } - - // remove from the linked list - if (prev) { - prev->next = pObj->next; - } else { - pContext->pHead = pObj->next; - } - - break; + if (pObj->prev) { + pObj->prev->next = pObj->next; + } else { + pContext->pHead = pObj->next; } - if (pObj) { - // update the meta data - - // free the resources associated - if (pObj->pStream) taos_close_stream(pObj->pStream); - pObj->pStream = NULL; - - cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->sid, pObj->sqlStr); - free(pObj); + if (pObj->next) { + pObj->next->prev = pObj->prev; } + // free the resources associated + if (pObj->pStream) taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + + cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); + free(pObj); + pthread_mutex_lock(&pContext->mutex); } @@ -252,7 +223,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SCqContext *pContext = pObj->pContext; if (pObj->pStream == NULL) return; - cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->sid, pObj->sqlStr); + cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); // construct data int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize; @@ -269,11 +240,10 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); // to do: fill in the SSubmitBlk strucuture - pBlk->tid = pObj->sid; + pBlk->tid = pObj->tid; // write into vnode write queue pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); - } diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index b0c6ca3178..f620f44382 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -29,16 +29,16 @@ int writeToQueue(void *pVnode, void *data, int type) { } int main(int argc, char *argv[]) { - char path[128] = "~/cq"; + int num = 3; for (int i=1; iwqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + SCqCfg cqCfg; + sprintf(cqCfg.user, "root"); + strcpy(cqCfg.pass, tsInternalPass); + cqCfg.cqWrite = vnodeWriteToQueue; + pVnode->cq = cqOpen(pVnode, &cqCfg); + STsdbAppH appH = {0}; appH.appH = (void *)pVnode; appH.walCallBack = vnodeWalCallback; + appH.cqH = pVnode->cq; sprintf(temp, "%s/tsdb", rootDir); pVnode->tsdb = tsdbOpenRepo(temp, &appH); @@ -210,12 +217,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->wal = walOpen(temp, &pVnode->walCfg); walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); - SCqCfg cqCfg; - sprintf(cqCfg.path, "%s/cq", rootDir); - strcpy(cqCfg.pass, tsInternalPass); - cqCfg.cqWrite = vnodeWriteToQueue; - pVnode->cq = cqOpen(pVnode, &cqCfg); - SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; syncInfo.version = pVnode->version; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a176468e85..77ed65c161 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -114,7 +114,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfTags = htons(pTable->numOfTags); int32_t sid = htonl(pTable->sid); - int32_t sqlDataLen = htonl(pTable->sqlDataLen); uint64_t uid = htobe64(pTable->uid); SSchema *pSchema = (SSchema *) pTable->data; @@ -151,14 +150,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe } code = tsdbCreateTable(pVnode->tsdb, &tCfg); - - if (code == 0 && sqlDataLen >0) { - char *sqlStr = NULL; - // to do: get the sqlStr - - cqCreate(pVnode->cq, sid, sqlStr, pSchema, numOfColumns); - } - tfree(pDestSchema); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code); @@ -176,7 +167,6 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet }; code = tsdbDropTable(pVnode->tsdb, tableId); - cqDrop(pVnode->cq, tableId.tid); return code; } From bf9d1274a5515b9bb2687048b69738a4798e1e10 Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 2 May 2020 11:06:04 +0800 Subject: [PATCH 08/11] [TD-184] fix bug while drop dnode --- src/client/src/tscSQLParser.c | 10 +- src/mnode/inc/mgmtDnode.h | 2 +- src/mnode/src/mgmtDnode.c | 22 ++- src/query/inc/qsqlparser.h | 2 +- src/query/inc/sql.y | 2 +- src/query/src/qparserImpl.c | 2 +- src/query/src/sql.c | 227 +++++++++++++------------- tests/script/unique/big/testSuite.sim | 2 +- tests/script/unique/mnode/mgmt33.sim | 4 +- 9 files changed, 134 insertions(+), 139 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 79188203da..dc4542a973 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -206,7 +206,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { case TSDB_SQL_DROP_ACCT: case TSDB_SQL_DROP_DNODE: case TSDB_SQL_DROP_DB: { - const char* msg1 = "invalid ip address"; const char* msg2 = "invalid name"; const char* msg3 = "param name too long"; @@ -230,10 +229,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } } else if (pInfo->type == TSDB_SQL_DROP_DNODE) { - if (!validateIpAddress(pzName->z, pzName->n)) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); - } - + pzName->n = strdequote(pzName->z); strncpy(pTableMetaInfo->name, pzName->z, pzName->n); } else { // drop user if (pzName->n > TSDB_USER_LEN) { @@ -304,10 +300,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } SSQLToken* pIpAddr = &pInfo->pDCLInfo->a[0]; - // if (!validateIpAddress(pIpAddr->z, pIpAddr->n)) { - // return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); - // } - pIpAddr->n = strdequote(pIpAddr->z); break; } diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 1b5199e727..1d7116c6c0 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -38,7 +38,7 @@ void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode); void mgmtIncDnodeRef(SDnodeObj *pDnode); void mgmtDecDnodeRef(SDnodeObj *pDnode); void * mgmtGetDnode(int32_t dnodeId); -void * mgmtGetDnodeByIp(char *ep); +void * mgmtGetDnodeByEp(char *ep); void mgmtUpdateDnode(SDnodeObj *pDnode); int32_t mgmtDropDnode(SDnodeObj *pDnode); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index c7643b9bf9..6629737787 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -74,7 +74,9 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) { static int32_t mgmtDnodeActionDelete(SSdbOper *pOper) { SDnodeObj *pDnode = pOper->pObj; +#ifndef _SYNC mgmtDropAllDnodeVgroups(pDnode); +#endif mgmtDropMnodeLocal(pDnode->dnodeId); balanceNotify(); @@ -113,7 +115,7 @@ static int32_t mgmtDnodeActionRestored() { int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); if (numOfRows <= 0 && dnodeIsFirstDeploy()) { mgmtCreateDnode(tsLocalEp); - SDnodeObj *pDnode = mgmtGetDnodeByIp(tsLocalEp); + SDnodeObj *pDnode = mgmtGetDnodeByEp(tsLocalEp); mgmtAddMnode(pDnode->dnodeId); mgmtDecDnodeRef(pDnode); } @@ -181,7 +183,7 @@ void *mgmtGetDnode(int32_t dnodeId) { return sdbGetRow(tsDnodeSdb, &dnodeId); } -void *mgmtGetDnodeByIp(char *ep) { +void *mgmtGetDnodeByEp(char *ep) { SDnodeObj *pDnode = NULL; void * pNode = NULL; @@ -271,7 +273,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { - pDnode = mgmtGetDnodeByIp(pStatus->dnodeEp); + pDnode = mgmtGetDnodeByEp(pStatus->dnodeEp); if (pDnode == NULL) { mTrace("dnode %s not created", pStatus->dnodeEp); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); @@ -358,7 +360,7 @@ static int32_t mgmtCreateDnode(char *ep) { return grantCode; } - SDnodeObj *pDnode = mgmtGetDnodeByIp(ep); + SDnodeObj *pDnode = mgmtGetDnodeByEp(ep); if (pDnode != NULL) { mgmtDecDnodeRef(pDnode); mError("dnode:%d is alredy exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort); @@ -391,6 +393,7 @@ static int32_t mgmtCreateDnode(char *ep) { return code; } +//TODO drop others tables int32_t mgmtDropDnode(SDnodeObj *pDnode) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -407,8 +410,9 @@ int32_t mgmtDropDnode(SDnodeObj *pDnode) { return code; } -static int32_t mgmtDropDnodeByIp(char *ep) { - SDnodeObj *pDnode = mgmtGetDnodeByIp(ep); +static int32_t mgmtDropDnodeByEp(char *ep) { + + SDnodeObj *pDnode = mgmtGetDnodeByEp(ep); if (pDnode == NULL) { mError("dnode:%s, is not exist", ep); return TSDB_CODE_DNODE_NOT_EXIST; @@ -437,7 +441,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) { } else { rpcRsp.code = mgmtCreateDnode(pCreate->ep); if (rpcRsp.code == TSDB_CODE_SUCCESS) { - SDnodeObj *pDnode = mgmtGetDnodeByIp(pCreate->ep); + SDnodeObj *pDnode = mgmtGetDnodeByEp(pCreate->ep); mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user); mgmtDecDnodeRef(pDnode); } else { @@ -456,7 +460,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) { if (strcmp(pMsg->pUser->user, "root") != 0) { rpcRsp.code = TSDB_CODE_NO_RIGHTS; } else { - rpcRsp.code = mgmtDropDnodeByIp(pDrop->ep); + rpcRsp.code = mgmtDropDnodeByEp(pDrop->ep); if (rpcRsp.code == TSDB_CODE_SUCCESS) { mLPrint("dnode:%s is dropped by %s", pDrop->ep, pMsg->pUser->user); } else { @@ -812,7 +816,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo SDnodeObj *pDnode = NULL; if (pShow->payloadLen > 0 ) { - pDnode = mgmtGetDnodeByIp(pShow->payload); + pDnode = mgmtGetDnodeByEp(pShow->payload); } else { mgmtGetNextDnode(NULL, (SDnodeObj **)&pDnode); } diff --git a/src/query/inc/qsqlparser.h b/src/query/inc/qsqlparser.h index 42dda2308f..08d4186291 100644 --- a/src/query/inc/qsqlparser.h +++ b/src/query/inc/qsqlparser.h @@ -277,7 +277,7 @@ SSubclauseInfo *setSubclause(SSubclauseInfo *pClause, void *pSqlExprInfo); SSubclauseInfo *appendSelectClause(SSubclauseInfo *pInfo, void *pSubclause); -void setCreatedMeterName(SSqlInfo *pInfo, SSQLToken *pMeterName, SSQLToken *pIfNotExists); +void setCreatedTableName(SSqlInfo *pInfo, SSQLToken *pMeterName, SSQLToken *pIfNotExists); void SQLInfoDestroy(SSqlInfo *pInfo); diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index 87b974b9ba..29d9d8aaac 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -264,7 +264,7 @@ signed(A) ::= MINUS INTEGER(X). { A = -strtol(X.z, NULL, 10);} ////////////////////////////////// The CREATE TABLE statement /////////////////////////////// cmd ::= CREATE TABLE ifnotexists(Y) ids(X) cpxName(Z) create_table_args. { X.n += Z.n; - setCreatedMeterName(pInfo, &X, &Y); + setCreatedTableName(pInfo, &X, &Y); } %type create_table_args{SCreateTableSQL*} diff --git a/src/query/src/qparserImpl.c b/src/query/src/qparserImpl.c index 7b3a76c9d2..5adb183af3 100644 --- a/src/query/src/qparserImpl.c +++ b/src/query/src/qparserImpl.c @@ -731,7 +731,7 @@ SSubclauseInfo* appendSelectClause(SSubclauseInfo *pQueryInfo, void *pSubclause) return pQueryInfo; } -void setCreatedMeterName(SSqlInfo *pInfo, SSQLToken *pMeterName, SSQLToken *pIfNotExists) { +void setCreatedTableName(SSqlInfo *pInfo, SSQLToken *pMeterName, SSQLToken *pIfNotExists) { pInfo->pCreateTableInfo->name = *pMeterName; pInfo->pCreateTableInfo->existCheck = (pIfNotExists->n != 0); } diff --git a/src/query/src/sql.c b/src/query/src/sql.c index 08a8d41c69..223068ef91 100644 --- a/src/query/src/sql.c +++ b/src/query/src/sql.c @@ -202,62 +202,61 @@ typedef union { ** yy_default[] Default action for each state. ** *********** Begin parsing tables **********************************************/ -#define YY_ACTTAB_COUNT (531) +#define YY_ACTTAB_COUNT (529) static const YYACTIONTYPE yy_action[] = { - /* 0 */ 752, 440, 133, 151, 244, 10, 616, 246, 133, 441, - /* 10 */ 133, 156, 821, 41, 43, 20, 35, 36, 820, 155, - /* 20 */ 821, 29, 741, 440, 201, 39, 37, 40, 38, 132, - /* 30 */ 499, 441, 97, 34, 33, 101, 152, 32, 31, 30, - /* 40 */ 41, 43, 741, 35, 36, 153, 137, 164, 29, 727, - /* 50 */ 749, 201, 39, 37, 40, 38, 186, 101, 225, 224, - /* 60 */ 34, 33, 163, 730, 32, 31, 30, 400, 401, 402, + /* 0 */ 752, 440, 135, 153, 244, 10, 616, 246, 135, 441, + /* 10 */ 135, 158, 821, 41, 43, 20, 35, 36, 820, 157, + /* 20 */ 821, 29, 741, 440, 203, 39, 37, 40, 38, 134, + /* 30 */ 499, 441, 99, 34, 33, 103, 154, 32, 31, 30, + /* 40 */ 41, 43, 741, 35, 36, 155, 139, 166, 29, 727, + /* 50 */ 749, 203, 39, 37, 40, 38, 188, 103, 227, 226, + /* 60 */ 34, 33, 165, 730, 32, 31, 30, 400, 401, 402, /* 70 */ 403, 404, 405, 406, 407, 408, 409, 410, 411, 245, - /* 80 */ 730, 41, 43, 189, 35, 36, 216, 236, 198, 29, - /* 90 */ 58, 20, 201, 39, 37, 40, 38, 32, 31, 30, - /* 100 */ 56, 34, 33, 76, 730, 32, 31, 30, 43, 236, - /* 110 */ 35, 36, 776, 817, 196, 29, 20, 20, 201, 39, - /* 120 */ 37, 40, 38, 165, 570, 727, 227, 34, 33, 440, - /* 130 */ 168, 32, 31, 30, 238, 35, 36, 441, 7, 816, - /* 140 */ 29, 61, 111, 201, 39, 37, 40, 38, 223, 228, + /* 80 */ 730, 41, 43, 191, 35, 36, 218, 238, 200, 29, + /* 90 */ 58, 20, 203, 39, 37, 40, 38, 32, 31, 30, + /* 100 */ 56, 34, 33, 76, 730, 32, 31, 30, 43, 238, + /* 110 */ 35, 36, 776, 817, 198, 29, 20, 20, 203, 39, + /* 120 */ 37, 40, 38, 167, 570, 727, 229, 34, 33, 440, + /* 130 */ 170, 32, 31, 30, 240, 35, 36, 441, 7, 816, + /* 140 */ 29, 61, 113, 203, 39, 37, 40, 38, 225, 230, /* 150 */ 727, 727, 34, 33, 50, 728, 32, 31, 30, 15, - /* 160 */ 215, 237, 214, 213, 212, 211, 210, 209, 208, 207, + /* 160 */ 217, 239, 216, 215, 214, 213, 212, 211, 210, 209, /* 170 */ 712, 51, 701, 702, 703, 704, 705, 706, 707, 708, - /* 180 */ 709, 710, 711, 160, 583, 11, 815, 574, 101, 577, - /* 190 */ 101, 580, 169, 160, 583, 222, 221, 574, 16, 577, - /* 200 */ 20, 580, 34, 33, 146, 26, 32, 31, 30, 238, - /* 210 */ 87, 86, 140, 175, 657, 157, 158, 124, 145, 200, - /* 220 */ 183, 715, 180, 714, 149, 157, 158, 160, 583, 531, - /* 230 */ 60, 574, 150, 577, 726, 580, 237, 16, 39, 37, + /* 180 */ 709, 710, 711, 162, 583, 11, 815, 574, 103, 577, + /* 190 */ 103, 580, 171, 162, 583, 224, 223, 574, 16, 577, + /* 200 */ 20, 580, 34, 33, 148, 26, 32, 31, 30, 240, + /* 210 */ 88, 87, 142, 177, 657, 159, 160, 126, 147, 202, + /* 220 */ 185, 715, 182, 714, 151, 159, 160, 162, 583, 531, + /* 230 */ 60, 574, 152, 577, 726, 580, 239, 16, 39, 37, /* 240 */ 40, 38, 27, 775, 26, 59, 34, 33, 551, 552, - /* 250 */ 32, 31, 30, 138, 114, 115, 68, 64, 67, 157, - /* 260 */ 158, 96, 515, 666, 185, 512, 124, 513, 26, 514, - /* 270 */ 523, 148, 128, 126, 240, 89, 88, 188, 42, 159, - /* 280 */ 74, 78, 239, 85, 77, 572, 528, 729, 42, 582, - /* 290 */ 80, 17, 658, 166, 167, 124, 243, 242, 93, 582, + /* 250 */ 32, 31, 30, 140, 116, 117, 68, 64, 67, 159, + /* 260 */ 160, 98, 515, 666, 187, 512, 126, 513, 26, 514, + /* 270 */ 523, 150, 130, 128, 91, 90, 89, 190, 42, 161, + /* 280 */ 74, 78, 83, 86, 77, 572, 528, 729, 42, 582, + /* 290 */ 80, 17, 658, 168, 169, 126, 243, 242, 95, 582, /* 300 */ 47, 542, 543, 600, 581, 45, 13, 12, 584, 576, - /* 310 */ 139, 579, 12, 575, 581, 578, 2, 73, 72, 48, - /* 320 */ 505, 573, 42, 743, 45, 504, 205, 9, 8, 21, - /* 330 */ 21, 141, 519, 582, 520, 517, 142, 518, 84, 83, - /* 340 */ 143, 144, 135, 131, 136, 830, 134, 786, 581, 785, - /* 350 */ 161, 782, 781, 162, 751, 721, 768, 226, 98, 767, - /* 360 */ 112, 113, 516, 668, 206, 110, 129, 24, 219, 665, - /* 370 */ 220, 829, 26, 70, 828, 826, 187, 116, 686, 25, - /* 380 */ 91, 22, 130, 655, 79, 653, 81, 651, 650, 538, - /* 390 */ 170, 125, 190, 648, 647, 646, 644, 194, 52, 740, - /* 400 */ 636, 127, 642, 640, 638, 49, 755, 102, 756, 44, - /* 410 */ 769, 199, 197, 195, 193, 191, 28, 218, 75, 229, - /* 420 */ 230, 231, 232, 233, 234, 235, 203, 53, 241, 614, - /* 430 */ 171, 172, 147, 62, 65, 174, 613, 177, 173, 179, - /* 440 */ 612, 176, 649, 178, 181, 643, 123, 687, 117, 119, - /* 450 */ 118, 120, 121, 90, 103, 725, 108, 104, 105, 122, - /* 460 */ 106, 107, 109, 92, 1, 23, 182, 188, 605, 184, - /* 470 */ 525, 55, 539, 57, 99, 154, 192, 18, 63, 4, - /* 480 */ 544, 100, 480, 585, 3, 19, 5, 14, 202, 6, - /* 490 */ 204, 479, 478, 477, 476, 475, 474, 473, 471, 45, - /* 500 */ 217, 444, 66, 21, 501, 500, 46, 498, 54, 465, - /* 510 */ 463, 455, 461, 457, 69, 459, 71, 453, 451, 472, - /* 520 */ 470, 82, 426, 442, 94, 415, 413, 618, 617, 617, - /* 530 */ 95, + /* 310 */ 141, 579, 12, 575, 581, 578, 2, 73, 72, 48, + /* 320 */ 505, 573, 42, 743, 45, 504, 207, 9, 8, 21, + /* 330 */ 21, 143, 519, 582, 520, 517, 144, 518, 85, 84, + /* 340 */ 145, 146, 137, 133, 138, 830, 136, 786, 581, 785, + /* 350 */ 163, 782, 781, 164, 751, 721, 768, 228, 100, 767, + /* 360 */ 114, 115, 516, 668, 208, 112, 131, 24, 221, 665, + /* 370 */ 222, 829, 26, 70, 828, 826, 189, 118, 686, 25, + /* 380 */ 93, 22, 132, 655, 79, 653, 81, 82, 651, 538, + /* 390 */ 650, 172, 192, 127, 648, 647, 646, 196, 52, 740, + /* 400 */ 645, 644, 636, 129, 642, 640, 49, 638, 44, 105, + /* 410 */ 755, 756, 201, 199, 195, 769, 197, 193, 28, 220, + /* 420 */ 75, 231, 232, 233, 234, 235, 236, 205, 237, 241, + /* 430 */ 53, 614, 173, 174, 175, 149, 62, 65, 176, 613, + /* 440 */ 179, 181, 649, 178, 180, 612, 92, 94, 183, 121, + /* 450 */ 184, 120, 687, 119, 122, 725, 123, 125, 124, 109, + /* 460 */ 106, 104, 643, 107, 110, 108, 111, 23, 1, 190, + /* 470 */ 605, 186, 525, 55, 539, 57, 156, 101, 194, 18, + /* 480 */ 19, 4, 544, 102, 204, 585, 3, 14, 5, 6, + /* 490 */ 63, 480, 206, 479, 478, 477, 476, 475, 474, 473, + /* 500 */ 471, 45, 219, 444, 66, 21, 501, 500, 46, 498, + /* 510 */ 54, 465, 463, 455, 461, 457, 69, 459, 453, 451, + /* 520 */ 472, 470, 71, 442, 96, 97, 415, 413, 618, }; static const YYCODETYPE yy_lookahead[] = { /* 0 */ 207, 1, 256, 206, 207, 256, 204, 205, 256, 9, @@ -300,20 +299,20 @@ static const YYCODETYPE yy_lookahead[] = { /* 370 */ 207, 207, 103, 207, 207, 207, 240, 207, 207, 207, /* 380 */ 59, 207, 207, 207, 207, 207, 207, 207, 207, 107, /* 390 */ 207, 207, 259, 207, 207, 207, 207, 259, 117, 253, - /* 400 */ 207, 207, 207, 207, 207, 119, 208, 252, 208, 116, - /* 410 */ 208, 111, 115, 110, 109, 108, 121, 75, 84, 83, - /* 420 */ 49, 80, 82, 53, 81, 79, 208, 208, 75, 5, - /* 430 */ 132, 5, 208, 212, 212, 58, 5, 5, 132, 58, - /* 440 */ 5, 132, 208, 132, 132, 208, 215, 222, 221, 216, - /* 450 */ 220, 219, 217, 209, 251, 240, 246, 250, 249, 218, - /* 460 */ 248, 247, 245, 209, 213, 210, 58, 104, 86, 124, - /* 470 */ 97, 105, 97, 101, 96, 1, 96, 101, 72, 112, - /* 480 */ 97, 96, 9, 97, 96, 101, 112, 96, 98, 96, - /* 490 */ 98, 5, 5, 5, 5, 1, 5, 5, 5, 101, - /* 500 */ 15, 76, 72, 101, 5, 5, 16, 97, 96, 5, - /* 510 */ 5, 5, 5, 5, 127, 5, 127, 5, 5, 5, - /* 520 */ 5, 58, 58, 76, 21, 59, 58, 0, 267, 267, - /* 530 */ 21, 267, 267, 267, 267, 267, 267, 267, 267, 267, + /* 400 */ 207, 207, 207, 207, 207, 207, 119, 207, 116, 251, + /* 410 */ 208, 208, 111, 115, 109, 208, 110, 108, 121, 75, + /* 420 */ 84, 83, 49, 80, 82, 53, 81, 208, 79, 75, + /* 430 */ 208, 5, 132, 5, 132, 208, 212, 212, 58, 5, + /* 440 */ 5, 58, 208, 132, 132, 5, 209, 209, 132, 216, + /* 450 */ 58, 220, 222, 221, 219, 240, 217, 215, 218, 247, + /* 460 */ 250, 252, 208, 249, 246, 248, 245, 210, 213, 104, + /* 470 */ 86, 124, 97, 105, 97, 101, 1, 96, 96, 101, + /* 480 */ 101, 112, 97, 96, 98, 97, 96, 96, 112, 96, + /* 490 */ 72, 9, 98, 5, 5, 5, 5, 1, 5, 5, + /* 500 */ 5, 101, 15, 76, 72, 101, 5, 5, 16, 97, + /* 510 */ 96, 5, 5, 5, 5, 5, 127, 5, 5, 5, + /* 520 */ 5, 5, 127, 76, 21, 21, 59, 58, 0, 267, + /* 530 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 540 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 550 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 560 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, @@ -333,41 +332,41 @@ static const YYCODETYPE yy_lookahead[] = { /* 700 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 710 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 720 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, - /* 730 */ 267, 267, 267, 267, + /* 730 */ 267, 267, }; #define YY_SHIFT_COUNT (246) #define YY_SHIFT_MIN (0) -#define YY_SHIFT_MAX (527) +#define YY_SHIFT_MAX (528) static const unsigned short int yy_shift_ofst[] = { /* 0 */ 141, 74, 182, 226, 128, 128, 128, 128, 128, 128, /* 10 */ 0, 22, 226, 260, 260, 260, 102, 128, 128, 128, - /* 20 */ 128, 128, 31, 149, 9, 9, 531, 192, 226, 226, + /* 20 */ 128, 128, 31, 149, 9, 9, 529, 192, 226, 226, /* 30 */ 226, 226, 226, 226, 226, 226, 226, 226, 226, 226, /* 40 */ 226, 226, 226, 226, 226, 260, 260, 25, 25, 25, /* 50 */ 25, 25, 25, 42, 25, 165, 128, 128, 135, 135, /* 60 */ 185, 128, 128, 128, 128, 128, 128, 128, 128, 128, /* 70 */ 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, /* 80 */ 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, - /* 90 */ 128, 128, 128, 128, 128, 128, 269, 321, 321, 282, - /* 100 */ 282, 321, 281, 286, 293, 300, 297, 303, 305, 307, - /* 110 */ 295, 269, 321, 321, 342, 342, 321, 334, 336, 371, - /* 120 */ 341, 340, 370, 343, 346, 321, 353, 321, 353, 531, - /* 130 */ 531, 27, 68, 68, 68, 94, 119, 213, 213, 213, - /* 140 */ 216, 169, 169, 169, 169, 190, 208, 67, 89, 60, - /* 150 */ 60, 236, 173, 204, 205, 206, 211, 304, 308, 284, - /* 160 */ 220, 199, 53, 223, 228, 229, 327, 330, 191, 201, - /* 170 */ 266, 424, 298, 426, 306, 377, 431, 309, 432, 311, - /* 180 */ 381, 435, 312, 408, 382, 345, 363, 373, 366, 372, - /* 190 */ 375, 378, 474, 380, 383, 385, 376, 367, 384, 374, - /* 200 */ 386, 388, 391, 390, 393, 392, 406, 473, 486, 487, - /* 210 */ 488, 489, 494, 491, 492, 493, 398, 425, 485, 430, - /* 220 */ 490, 387, 389, 402, 499, 500, 410, 412, 402, 504, - /* 230 */ 505, 506, 507, 508, 510, 512, 513, 514, 515, 463, - /* 240 */ 464, 447, 503, 509, 466, 468, 527, + /* 90 */ 128, 128, 128, 128, 128, 128, 128, 128, 269, 321, + /* 100 */ 321, 282, 282, 321, 281, 287, 292, 301, 298, 306, + /* 110 */ 305, 309, 297, 269, 321, 321, 344, 344, 321, 336, + /* 120 */ 338, 373, 343, 342, 372, 345, 349, 321, 354, 321, + /* 130 */ 354, 529, 529, 27, 68, 68, 68, 94, 119, 213, + /* 140 */ 213, 213, 216, 169, 169, 169, 169, 190, 208, 67, + /* 150 */ 89, 60, 60, 236, 173, 204, 205, 206, 211, 304, + /* 160 */ 308, 284, 220, 199, 53, 223, 228, 229, 327, 330, + /* 170 */ 191, 201, 266, 426, 300, 428, 302, 380, 434, 311, + /* 180 */ 435, 312, 383, 440, 316, 392, 384, 347, 365, 375, + /* 190 */ 368, 374, 377, 381, 475, 382, 385, 387, 378, 369, + /* 200 */ 379, 376, 388, 390, 391, 386, 393, 394, 418, 482, + /* 210 */ 488, 489, 490, 491, 496, 493, 494, 495, 400, 427, + /* 220 */ 487, 432, 492, 389, 395, 404, 501, 502, 412, 414, + /* 230 */ 404, 506, 507, 508, 509, 510, 512, 513, 514, 515, + /* 240 */ 516, 447, 503, 504, 467, 469, 528, }; -#define YY_REDUCE_COUNT (130) +#define YY_REDUCE_COUNT (132) #define YY_REDUCE_MIN (-254) -#define YY_REDUCE_MAX (255) +#define YY_REDUCE_MAX (257) static const short yy_reduce_ofst[] = { /* 0 */ -198, -53, -254, -246, -150, -172, -192, -116, -91, -90, /* 10 */ -207, -203, -248, -179, -162, -138, -218, -175, -19, -17, @@ -378,11 +377,11 @@ static const short yy_reduce_ofst[] = { /* 60 */ 121, 153, 154, 156, 157, 159, 160, 161, 162, 163, /* 70 */ 164, 166, 167, 168, 170, 171, 172, 174, 175, 176, /* 80 */ 177, 178, 179, 180, 181, 183, 184, 186, 187, 188, - /* 90 */ 189, 193, 194, 195, 196, 197, 136, 198, 200, 133, - /* 100 */ 138, 202, 146, 155, 203, 207, 209, 212, 214, 210, - /* 110 */ 217, 215, 218, 219, 221, 222, 224, 225, 227, 230, - /* 120 */ 233, 232, 235, 241, 231, 234, 244, 237, 254, 251, - /* 130 */ 255, + /* 90 */ 189, 193, 194, 195, 196, 197, 198, 200, 136, 202, + /* 100 */ 203, 133, 138, 207, 146, 209, 158, 210, 214, 217, + /* 110 */ 212, 218, 221, 215, 219, 222, 224, 225, 227, 230, + /* 120 */ 232, 231, 233, 235, 239, 240, 242, 234, 237, 254, + /* 130 */ 238, 255, 257, }; static const YYACTIONTYPE yy_default[] = { /* 0 */ 615, 667, 823, 823, 615, 615, 615, 615, 615, 615, @@ -394,21 +393,21 @@ static const YYACTIONTYPE yy_default[] = { /* 60 */ 746, 615, 615, 615, 615, 615, 615, 615, 615, 615, /* 70 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 654, /* 80 */ 615, 652, 615, 615, 615, 615, 615, 615, 615, 615, - /* 90 */ 615, 615, 615, 641, 615, 615, 615, 635, 635, 615, - /* 100 */ 615, 635, 779, 783, 777, 765, 773, 764, 760, 759, - /* 110 */ 787, 615, 635, 635, 664, 664, 635, 685, 683, 681, - /* 120 */ 673, 679, 675, 677, 671, 635, 662, 635, 662, 700, - /* 130 */ 713, 615, 788, 822, 778, 806, 805, 818, 812, 811, - /* 140 */ 615, 810, 809, 808, 807, 615, 615, 615, 615, 814, - /* 150 */ 813, 615, 615, 615, 615, 615, 615, 615, 615, 615, - /* 160 */ 790, 784, 780, 615, 615, 615, 615, 615, 615, 615, + /* 90 */ 615, 615, 615, 615, 615, 641, 615, 615, 615, 635, + /* 100 */ 635, 615, 615, 635, 779, 783, 777, 765, 773, 764, + /* 110 */ 760, 759, 787, 615, 635, 635, 664, 664, 635, 685, + /* 120 */ 683, 681, 673, 679, 675, 677, 671, 635, 662, 635, + /* 130 */ 662, 700, 713, 615, 788, 822, 778, 806, 805, 818, + /* 140 */ 812, 811, 615, 810, 809, 808, 807, 615, 615, 615, + /* 150 */ 615, 814, 813, 615, 615, 615, 615, 615, 615, 615, + /* 160 */ 615, 615, 790, 784, 780, 615, 615, 615, 615, 615, /* 170 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 615, - /* 180 */ 615, 615, 615, 615, 615, 615, 745, 615, 615, 754, - /* 190 */ 615, 615, 615, 615, 615, 615, 774, 615, 766, 615, - /* 200 */ 615, 615, 615, 615, 615, 722, 615, 615, 615, 615, - /* 210 */ 615, 615, 615, 615, 615, 615, 688, 615, 615, 615, - /* 220 */ 615, 615, 615, 827, 615, 615, 615, 716, 825, 615, - /* 230 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 615, + /* 180 */ 615, 615, 615, 615, 615, 615, 615, 615, 745, 615, + /* 190 */ 615, 754, 615, 615, 615, 615, 615, 615, 774, 615, + /* 200 */ 766, 615, 615, 615, 615, 615, 615, 722, 615, 615, + /* 210 */ 615, 615, 615, 615, 615, 615, 615, 615, 688, 615, + /* 220 */ 615, 615, 615, 615, 615, 827, 615, 615, 615, 716, + /* 230 */ 825, 615, 615, 615, 615, 615, 615, 615, 615, 615, /* 240 */ 615, 615, 639, 637, 615, 631, 615, }; /********** End of lemon-generated parsing tables *****************************/ @@ -1019,15 +1018,15 @@ static const char *const yyRuleName[] = { /* 24 */ "cmd ::= SHOW dbPrefix VGROUPS ids", /* 25 */ "cmd ::= DROP TABLE ifexists ids cpxName", /* 26 */ "cmd ::= DROP DATABASE ifexists ids", - /* 27 */ "cmd ::= DROP DNODE IPTOKEN", + /* 27 */ "cmd ::= DROP DNODE ids", /* 28 */ "cmd ::= DROP USER ids", /* 29 */ "cmd ::= DROP ACCOUNT ids", /* 30 */ "cmd ::= USE ids", /* 31 */ "cmd ::= DESCRIBE ids cpxName", /* 32 */ "cmd ::= ALTER USER ids PASS ids", /* 33 */ "cmd ::= ALTER USER ids PRIVILEGE ids", - /* 34 */ "cmd ::= ALTER DNODE IPTOKEN ids", - /* 35 */ "cmd ::= ALTER DNODE IPTOKEN ids ids", + /* 34 */ "cmd ::= ALTER DNODE ids ids", + /* 35 */ "cmd ::= ALTER DNODE ids ids ids", /* 36 */ "cmd ::= ALTER LOCAL ids", /* 37 */ "cmd ::= ALTER LOCAL ids ids", /* 38 */ "cmd ::= ALTER DATABASE ids alter_db_optr", @@ -1692,15 +1691,15 @@ static const struct { { 205, -4 }, /* (24) cmd ::= SHOW dbPrefix VGROUPS ids */ { 205, -5 }, /* (25) cmd ::= DROP TABLE ifexists ids cpxName */ { 205, -4 }, /* (26) cmd ::= DROP DATABASE ifexists ids */ - { 205, -3 }, /* (27) cmd ::= DROP DNODE IPTOKEN */ + { 205, -3 }, /* (27) cmd ::= DROP DNODE ids */ { 205, -3 }, /* (28) cmd ::= DROP USER ids */ { 205, -3 }, /* (29) cmd ::= DROP ACCOUNT ids */ { 205, -2 }, /* (30) cmd ::= USE ids */ { 205, -3 }, /* (31) cmd ::= DESCRIBE ids cpxName */ { 205, -5 }, /* (32) cmd ::= ALTER USER ids PASS ids */ { 205, -5 }, /* (33) cmd ::= ALTER USER ids PRIVILEGE ids */ - { 205, -4 }, /* (34) cmd ::= ALTER DNODE IPTOKEN ids */ - { 205, -5 }, /* (35) cmd ::= ALTER DNODE IPTOKEN ids ids */ + { 205, -4 }, /* (34) cmd ::= ALTER DNODE ids ids */ + { 205, -5 }, /* (35) cmd ::= ALTER DNODE ids ids ids */ { 205, -3 }, /* (36) cmd ::= ALTER LOCAL ids */ { 205, -4 }, /* (37) cmd ::= ALTER LOCAL ids ids */ { 205, -4 }, /* (38) cmd ::= ALTER DATABASE ids alter_db_optr */ @@ -2063,7 +2062,7 @@ static void yy_reduce( case 26: /* cmd ::= DROP DATABASE ifexists ids */ { setDropDBTableInfo(pInfo, TSDB_SQL_DROP_DB, &yymsp[0].minor.yy0, &yymsp[-1].minor.yy0); } break; - case 27: /* cmd ::= DROP DNODE IPTOKEN */ + case 27: /* cmd ::= DROP DNODE ids */ { setDCLSQLElems(pInfo, TSDB_SQL_DROP_DNODE, 1, &yymsp[0].minor.yy0); } break; case 28: /* cmd ::= DROP USER ids */ @@ -2087,10 +2086,10 @@ static void yy_reduce( case 33: /* cmd ::= ALTER USER ids PRIVILEGE ids */ { setAlterUserSQL(pInfo, TSDB_ALTER_USER_PRIVILEGES, &yymsp[-2].minor.yy0, NULL, &yymsp[0].minor.yy0);} break; - case 34: /* cmd ::= ALTER DNODE IPTOKEN ids */ + case 34: /* cmd ::= ALTER DNODE ids ids */ { setDCLSQLElems(pInfo, TSDB_SQL_CFG_DNODE, 2, &yymsp[-1].minor.yy0, &yymsp[0].minor.yy0); } break; - case 35: /* cmd ::= ALTER DNODE IPTOKEN ids ids */ + case 35: /* cmd ::= ALTER DNODE ids ids ids */ { setDCLSQLElems(pInfo, TSDB_SQL_CFG_DNODE, 3, &yymsp[-2].minor.yy0, &yymsp[-1].minor.yy0, &yymsp[0].minor.yy0); } break; case 36: /* cmd ::= ALTER LOCAL ids */ @@ -2267,7 +2266,7 @@ static void yy_reduce( case 103: /* cmd ::= CREATE TABLE ifnotexists ids cpxName create_table_args */ { yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n; - setCreatedMeterName(pInfo, &yymsp[-2].minor.yy0, &yymsp[-3].minor.yy0); + setCreatedTableName(pInfo, &yymsp[-2].minor.yy0, &yymsp[-3].minor.yy0); } break; case 104: /* create_table_args ::= LP columnlist RP */ diff --git a/tests/script/unique/big/testSuite.sim b/tests/script/unique/big/testSuite.sim index 05a003ac2f..5881d1fb67 100644 --- a/tests/script/unique/big/testSuite.sim +++ b/tests/script/unique/big/testSuite.sim @@ -1,3 +1,3 @@ #run unique/big/balance.sim #run unique/big/maxvnodes.sim -run unique/big/tcp.sim +#run unique/big/tcp.sim diff --git a/tests/script/unique/mnode/mgmt33.sim b/tests/script/unique/mnode/mgmt33.sim index df0a289ab7..ef9b0bbcc7 100644 --- a/tests/script/unique/mnode/mgmt33.sim +++ b/tests/script/unique/mnode/mgmt33.sim @@ -106,7 +106,7 @@ sleep 8000 sql show mnodes $dnode1Role = $data2_1 -$dnode2Role = $data3_4 +$dnode2Role = $data2_4 $dnode3Role = $data2_3 print dnode1 ==> $dnode1Role print dnode2 ==> $dnode2Role @@ -128,7 +128,7 @@ sleep 10000 sql show mnodes $dnode1Role = $data2_1 -$dnode2Role = $data3_4 +$dnode2Role = $data2_4 $dnode3Role = $data2_3 print dnode1 ==> $dnode1Role print dnode2 ==> $dnode2Role From 6ff5f373e2b28f2f72e1fe740f505a834f8c14f1 Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 2 May 2020 13:05:56 +0800 Subject: [PATCH 09/11] add some log for sdb --- src/mnode/src/mgmtSdb.c | 23 ++++++++++++++--------- tests/script/unique/mnode/mgmt33.sim | 8 ++++---- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 53b9d2b814..839695a6e5 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -212,15 +212,16 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { tsSdbObj.code = code; - sdbTrace("sdb forward request confirmed, result:%s", tstrerror(code)); sem_post(&tsSdbObj.sem); + sdbTrace("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); } -static int32_t sdbForwardToPeer(void *pHead) { +static int32_t sdbForwardToPeer(SWalHead *pHead) { if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; - int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, NULL); + int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version); if (code > 0) { + sdbTrace("forward request is sent, version:%" PRIu64 ", result:%s", pHead->version, tstrerror(code)); sem_wait(&tsSdbObj.sem); return tsSdbObj.code; } @@ -332,7 +333,7 @@ void sdbIncRef(void *handle, void *pRow) { SSdbTable *pTable = handle; int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); atomic_add_fetch_32(pRefCount, 1); - if (0 && pTable->tableId == SDB_TABLE_CTABLE) { + if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) { sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); } @@ -344,7 +345,7 @@ void sdbDecRef(void *handle, void *pRow) { SSdbTable *pTable = handle; int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); - if (0 && pTable->tableId == SDB_TABLE_CTABLE) { + if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) { sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); } @@ -474,14 +475,18 @@ static int sdbWrite(void *param, void *data, int type) { } walFsync(tsSdbObj.wal); - sdbForwardToPeer(pHead); + code = sdbForwardToPeer(pHead); pthread_mutex_unlock(&tsSdbObj.mutex); // from app, oper is created - if (param != NULL) return code; - - // from wal or forward msg, should create oper + if (param != NULL) { + //sdbTrace("request from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code)); + return code; + } + + // from wal or forward msg, oper not created, should add into hash if (tsSdbObj.sync != NULL) { + sdbTrace("forward request is received, version:%" PRIu64 " result:%s, confirm it", pHead->version, tstrerror(code)); syncConfirmForward(tsSdbObj.sync, pHead->version, code); } diff --git a/tests/script/unique/mnode/mgmt33.sim b/tests/script/unique/mnode/mgmt33.sim index ef9b0bbcc7..30fae2c243 100644 --- a/tests/script/unique/mnode/mgmt33.sim +++ b/tests/script/unique/mnode/mgmt33.sim @@ -8,7 +8,7 @@ system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3 system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3 print ============== step1 -system sh/exec_up.sh -n dnode1 -s start +system sh/exec_up.sh -n dnode1 -s start -t sql connect sql show mnodes @@ -26,7 +26,7 @@ if $data3_3 != null then endi print ============== step2 -system sh/exec_up.sh -n dnode2 -s start +system sh/exec_up.sh -n dnode2 -s start -t sql create dnode $hostname2 sleep 8000 @@ -49,7 +49,7 @@ if $dnode3Role != null then endi print ============== step3 -system sh/exec_up.sh -n dnode3 -s start +system sh/exec_up.sh -n dnode3 -s start -t sql create dnode $hostname3 sleep 8000 @@ -98,7 +98,7 @@ sleep 3000 system sh/deploy.sh -n dnode2 -i 2 system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3 -system sh/exec_up.sh -n dnode2 -s start +system sh/exec_up.sh -n dnode2 -s start -t print ============== step5 sql create dnode $hostname2 From 504860d14c9ba8a54dc5db3658a31602575bffbf Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 2 May 2020 15:33:47 +0800 Subject: [PATCH 10/11] fix maxTables problem --- src/mnode/src/mgmtVgroup.c | 2 +- src/tsdb/src/tsdbMain.c | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index b3260850ad..6e694b9fc1 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -549,7 +549,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { pCfg->cfgVersion = htonl(pDb->cfgVersion); pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks); - pCfg->maxTables = htonl(pDb->cfg.maxTables); + pCfg->maxTables = htonl(pDb->cfg.maxTables + 1); pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile); pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep); pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c0f6030fa6..fcfbcc9014 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -611,10 +611,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) return -1; } - // Since tableId starts from 1, we increase maxTables by 1 - // TODO: take a fancier way to do this - pCfg->maxTables++; - // Check daysPerFile if (pCfg->daysPerFile == -1) { pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; From c3a19c9d66e3976c1c0d32972b790b0134fad175 Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 2 May 2020 15:39:10 +0800 Subject: [PATCH 11/11] should not send msg while sdb restore --- src/mnode/inc/mgmtVgroup.h | 2 +- src/mnode/src/mgmtDb.c | 6 ++++-- src/mnode/src/mgmtVgroup.c | 7 +++++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 3f8dc35a00..21a2c9b896 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -32,7 +32,7 @@ void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); void mgmtIncVgroupRef(SVgObj *pVgroup); void mgmtDecVgroupRef(SVgObj *pVgroup); -void mgmtDropAllDbVgroups(SDbObj *pDropDb); +void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg); void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode); void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 3e7577af06..4d8bce2a67 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -82,7 +82,7 @@ static int32_t mgmtDbActionDelete(SSdbOper *pOper) { mgmtDropDbFromAcct(pAcct, pDb); mgmtDropAllChildTables(pDb); mgmtDropAllSuperTables(pDb); - mgmtDropAllDbVgroups(pDb); + mgmtDropAllDbVgroups(pDb, false); mgmtDecAcctRef(pAcct); return TSDB_CODE_SUCCESS; @@ -932,7 +932,9 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { return; } -#if 0 +#if 1 + mgmtDropAllDbVgroups(pMsg->pDb, true); +#else SVgObj *pVgroup = pMsg->pDb->pHead; if (pVgroup != NULL) { mPrint("vgId:%d, will be dropped", pVgroup->vgId); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index b3260850ad..ae2d2dcb08 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -773,7 +773,7 @@ void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode) { } } -void mgmtDropAllDbVgroups(SDbObj *pDropDb) { +void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) { void *pNode = NULL; void *pLastNode = NULL; int32_t numOfVgroups = 0; @@ -794,7 +794,10 @@ void mgmtDropAllDbVgroups(SDbObj *pDropDb) { sdbDeleteRow(&oper); pNode = pLastNode; numOfVgroups++; - mgmtSendDropVgroupMsg(pVgroup, NULL); + + if (sendMsg) { + mgmtSendDropVgroupMsg(pVgroup, NULL); + } } mgmtDecVgroupRef(pVgroup);