Merge branch 'feature/query' of https://github.com/taosdata/TDengine into feature/query

This commit is contained in:
root 2020-07-20 18:41:08 +08:00
commit d633a966f8
18 changed files with 795 additions and 213 deletions

View File

@ -52,12 +52,20 @@ typedef struct STableComInfo {
int32_t rowSize; int32_t rowSize;
} STableComInfo; } STableComInfo;
typedef struct SCMCorVgroupInfo {
int32_t version;
int8_t inUse;
int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo;
typedef struct STableMeta { typedef struct STableMeta {
STableComInfo tableInfo; STableComInfo tableInfo;
uint8_t tableType; uint8_t tableType;
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
SCMVgroupInfo vgroupInfo; SCMVgroupInfo vgroupInfo;
SCMCorVgroupInfo corVgroupInfo;
int32_t sid; // the index of one table in a virtual node int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
@ -456,7 +464,8 @@ extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
extern int tsInsertHeadSize; extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern SRpcEpSet tscMgmtEpSet;
extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);

View File

@ -140,7 +140,15 @@ struct SSchema tscGetTbnameColumnSchema() {
strcpy(s.name, TSQL_TBNAME_L); strcpy(s.name, TSQL_TBNAME_L);
return s; return s;
} }
static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) {
corVgroupInfo->version = 0;
corVgroupInfo->inUse = 0;
corVgroupInfo->numOfEps = vgroupInfo->numOfEps;
for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) {
strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port;
}
}
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
assert(pTableMetaMsg != NULL); assert(pTableMetaMsg != NULL);
@ -157,6 +165,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo);
pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion; pTableMeta->tversion = pTableMetaMsg->tversion;

View File

@ -26,10 +26,11 @@
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "tlockfree.h"
#define TSC_MGMT_VNODE 999 #define TSC_MGMT_VNODE 999
SRpcEpSet tscMgmtEpSet; SRpcCorEpSet tscMgmtEpSet;
SRpcEpSet tscDnodeEpSet; SRpcEpSet tscDnodeEpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
@ -51,40 +52,81 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
pEpSet->numOfEps = 0; pEpSet->numOfEps = 0;
return; return;
} }
pEpSet->numOfEps = pVgroupInfo->numOfEps; pEpSet->numOfEps = pVgroupInfo->numOfEps;
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn); strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
pEpSet->port[i] = pVgroupInfo->epAddr[i].port; pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
} }
} }
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
taosCorBeginRead(&tscMgmtEpSet.version);
*epSet = tscMgmtEpSet.epSet;
taosCorEndRead(&tscMgmtEpSet.version);
}
static void tscEpSetHtons(SRpcEpSet *s) {
for (int32_t i = 0; i < s->numOfEps; i++) {
s->port[i] = htons(s->port[i]);
}
}
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false;
}
for (int32_t i = 0; i < s1->numOfEps; i++) {
if (s1->port[i] != s2->port[i]
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
return false;
}
return true;
}
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
// no need to update if equal
taosCorBeginWrite(&tscMgmtEpSet.version);
tscMgmtEpSet.epSet = *pEpSet;
taosCorEndWrite(&tscMgmtEpSet.version);
}
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
if (pVgroupInfo == NULL) { return;}
taosCorBeginRead(&pVgroupInfo->version);
int8_t inUse = pVgroupInfo->inUse;
pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0;
pEpSet->numOfEps = pVgroupInfo->numOfEps;
for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
}
taosCorEndRead(&pVgroupInfo->version);
}
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pObj->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
taosCorBeginWrite(&pVgroupInfo->version);
//TODO(dengyihao), dont care vgid
pVgroupInfo->inUse = pEpSet->inUse;
pVgroupInfo->numOfEps = pEpSet->numOfEps;
for (int32_t i = 0; pVgroupInfo->numOfEps; i++) {
strncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
pVgroupInfo->epAddr[i].port = pEpSet->port[i];
}
taosCorEndWrite(&pVgroupInfo->version);
}
void tscPrintMgmtEp() { void tscPrintMgmtEp() {
if (tscMgmtEpSet.numOfEps <= 0) { SRpcEpSet dump;
tscError("invalid mnode EP list:%d", tscMgmtEpSet.numOfEps); tscDumpMgmtEpSet(&dump);
if (dump.numOfEps <= 0) {
tscError("invalid mnode EP list:%d", dump.numOfEps);
} else { } else {
for (int i = 0; i < tscMgmtEpSet.numOfEps; ++i) { for (int i = 0; i < dump.numOfEps; ++i) {
tscDebug("mnode index:%d %s:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]); tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
} }
} }
} }
void tscSetMgmtEpSet(SRpcEpSet *pEpSet) {
tscMgmtEpSet.numOfEps = pEpSet->numOfEps;
tscMgmtEpSet.inUse = pEpSet->inUse;
for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
tscMgmtEpSet.port[i] = htons(pEpSet->port[i]);
}
}
void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) {
tscMgmtEpSet = *pEpSet;
tscDebug("mnode EP list is changed for ufp is called, numOfEps:%d inUse:%d", tscMgmtEpSet.numOfEps, tscMgmtEpSet.inUse);
for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]);
}
}
/* /*
* For each management node, try twice at least in case of poor network situation. * For each management node, try twice at least in case of poor network situation.
* If the client start to connect to a non-management node from the client, and the first retry may fail due to * If the client start to connect to a non-management node from the client, and the first retry may fail due to
@ -95,7 +137,9 @@ void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) {
UNUSED_FUNC UNUSED_FUNC
static int32_t tscGetMgmtConnMaxRetryTimes() { static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2; int32_t factor = 2;
return tscMgmtEpSet.numOfEps * factor; SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
return dump.numOfEps * factor;
} }
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
@ -111,9 +155,11 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (code == 0) { if (code == 0) {
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
SRpcEpSet * pEpSet = &pRsp->epSet; SRpcEpSet * epSet = &pRsp->epSet;
if (pEpSet->numOfEps > 0) if (epSet->numOfEps > 0) {
tscSetMgmtEpSet(pEpSet); tscEpSetHtons(epSet);
tscUpdateMgmtEpSet(epSet);
}
pSql->pTscObj->connId = htonl(pRsp->connId); pSql->pTscObj->connId = htonl(pRsp->connId);
@ -185,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// set the mgmt ip list // set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) { if (pSql->cmd.command >= TSDB_SQL_MGMT) {
pSql->epSet = tscMgmtEpSet; tscDumpMgmtEpSet(&pSql->epSet);
} }
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
@ -236,10 +282,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return; return;
} }
if (pCmd->command < TSDB_SQL_MGMT) { if (pEpSet) {
if (pEpSet) pSql->epSet = *pEpSet; //SRpcEpSet dump;
} else { tscEpSetHtons(pEpSet);
if (pEpSet) tscMgmtEpSet = *pEpSet; if (tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
if(pCmd->command < TSDB_SQL_MGMT) {
tscUpdateVgroupInfo(pSql, pEpSet);
} else {
tscUpdateMgmtEpSet(pEpSet);
}
}
} }
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
@ -400,7 +452,8 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code; return pSql->res.code;
} }
} else if (pCmd->command < TSDB_SQL_LOCAL) { } else if (pCmd->command < TSDB_SQL_LOCAL) {
pSql->epSet = tscMgmtEpSet;
//pSql->epSet = tscMgmtEpSet;
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
@ -504,8 +557,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload // pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeEpSet(pSql, &pTableMeta->vgroupInfo); tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
pSql->epSet.numOfEps); pSql->epSet.numOfEps);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -546,11 +599,11 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} else { } else {
pVgroupInfo = &pTableMeta->vgroupInfo; pVgroupInfo = &pTableMeta->vgroupInfo;
} }
tscSetDnodeEpSet(pSql, pVgroupInfo); tscSetDnodeEpSet(pSql, pVgroupInfo);
if (pVgroupInfo != NULL) { if (pVgroupInfo != NULL) {
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
} }
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->sid); pTableIdInfo->tid = htonl(pTableMeta->sid);
@ -567,8 +620,8 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups); tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info // set the vgroup info
tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
@ -1115,11 +1168,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShowMsg->payloadLen = htons(pPattern->n); pShowMsg->payloadLen = htons(pPattern->n);
} }
} else { } else {
SSQLToken *pIpAddr = &pShowInfo->prefix; SSQLToken *pEpAddr = &pShowInfo->prefix;
assert(pIpAddr->n > 0 && pIpAddr->type > 0); assert(pEpAddr->n > 0 && pEpAddr->type > 0);
strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n); strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
pShowMsg->payloadLen = htons(pIpAddr->n); pShowMsg->payloadLen = htons(pEpAddr->n);
} }
pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen; pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
@ -1302,7 +1355,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscSetDnodeEpSet(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1826,13 +1879,14 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
memcpy(pInfo->vgroupList, pVgroupInfo, size); memcpy(pInfo->vgroupList, pVgroupInfo, size);
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
//just init, no need to lock
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
pVgroups->vgId = htonl(pVgroups->vgId); pVgroups->vgId = htonl(pVgroups->vgId);
assert(pVgroups->numOfEps >= 1); assert(pVgroups->numOfEps >= 1);
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port); pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
} }
pMsg += size; pMsg += size;
@ -1925,8 +1979,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
assert(len <= sizeof(pObj->db)); assert(len <= sizeof(pObj->db));
tstrncpy(pObj->db, temp, sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db));
if (pConnect->epSet.numOfEps > 0) if (pConnect->epSet.numOfEps > 0) {
tscSetMgmtEpSet(&pConnect->epSet); tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpSet(&pConnect->epSet);
}
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;

View File

@ -63,7 +63,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
if (ip) { if (ip) {
if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL;
if (port) tscMgmtEpSet.port[0] = port; if (port) tscMgmtEpSet.epSet.port[0] = port;
} }
void *pDnodeConn = NULL; void *pDnodeConn = NULL;
@ -724,6 +724,13 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
return len; return len;
} }
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param);
pSql->res.code = code;
sem_post(&pSql->rspSem);
}
int taos_validate_sql(TAOS *taos, const char *sql) { int taos_validate_sql(TAOS *taos, const char *sql) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) { if (pObj == NULL || pObj->signature != pObj) {
@ -732,7 +739,8 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
@ -766,10 +774,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pCmd->pTableList = NULL; pCmd->pTableList = NULL;
} }
pRes->code = (uint8_t)tsParseSql(pSql, false); pSql->fp = asyncCallback;
int code = pRes->code; pSql->fetchFp = asyncCallback;
pSql->param = pSql;
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); int code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
sem_wait(&pSql->rspSem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj);
}
taos_free_result(pSql); taos_free_result(pSql);
return code; return code;
@ -865,6 +880,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pRes->numOfTotal = 0; // the number of getting table meta from server pRes->numOfTotal = 0; // the number of getting table meta from server

View File

@ -122,7 +122,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pQueryInfo->window.ekey = pStream->etime; pQueryInfo->window.ekey = pStream->etime;
} }
} else { } else {
pQueryInfo->window.skey = pStream->stime - pStream->interval; pQueryInfo->window.skey = pStream->stime;
int64_t etime = taosGetTimestamp(pStream->precision); int64_t etime = taosGetTimestamp(pStream->precision);
// delay to wait all data in last time window // delay to wait all data in last time window
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
@ -232,6 +232,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
(*pStream->fp)(pStream->param, res, row); (*pStream->fp)(pStream->param, res, row);
} }
if (!pStream->isProject) {
pStream->stime += pStream->slidingTime;
}
// actually only one row is returned. this following is not necessary // actually only one row is returned. this following is not necessary
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
} else { // numOfRows == 0, all data has been retrieved } else { // numOfRows == 0, all data has been retrieved
@ -432,6 +435,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
} else { // timewindow based aggregation stream } else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now if (stime == 0) { // no data in meter till now
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
stime -= pStream->interval;
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
} else { } else {
int64_t newStime = (stime / pStream->interval) * pStream->interval; int64_t newStime = (stime / pStream->interval) * pStream->interval;

View File

@ -34,6 +34,7 @@ typedef struct SSubscriptionProgress {
typedef struct SSub { typedef struct SSub {
void * signature; void * signature;
char topic[32]; char topic[32];
sem_t sem;
int64_t lastSyncTime; int64_t lastSyncTime;
int64_t lastConsumeTime; int64_t lastConsumeTime;
TAOS * taos; TAOS * taos;
@ -83,84 +84,108 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
static void asyncCallback(void *param, TAOS_RES *tres, int code) { static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param); SSub *pSub = ((SSub *)param);
pSub->pSql->res.code = code;
pSql->res.code = code; sem_post(&pSub->sem);
sem_post(&pSql->rspSem);
} }
static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) { static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) {
SSub* pSub = NULL; int code = TSDB_CODE_SUCCESS, line = __LINE__;
SSqlObj* pSql = NULL;
TRY( 8 ) { SSub* pSub = calloc(1, sizeof(SSub));
SSqlObj* pSql = calloc_throw(1, sizeof(SSqlObj)); if (pSub == NULL) {
CLEANUP_PUSH_FREE(true, pSql); line = __LINE__;
SSqlCmd *pCmd = &pSql->cmd; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
SSqlRes *pRes = &pSql->res; goto fail;
}
pSub->signature = pSub;
if (tsem_init(&pSub->sem, 0, 0) == -1) {
line = __LINE__;
code = TAOS_SYSTEM_ERROR(errno);
goto fail;
}
tstrncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) {
line = __LINE__;
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
if (tsem_init(&pSql->rspSem, 0, 0) == -1) { pSql = calloc(1, sizeof(SSqlObj));
THROW(TAOS_SYSTEM_ERROR(errno)); if (pSql == NULL) {
} line = __LINE__;
CLEANUP_PUSH_INT_PTR(true, tsem_destroy, &pSql->rspSem); code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->pSubscription = pSub;
pSub->pSql = pSql;
pSql->signature = pSql; SSqlCmd* pCmd = &pSql->cmd;
pSql->param = pSql; SSqlRes* pRes = &pSql->res;
pSql->pTscObj = pObj; if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
pSql->maxRetry = TSDB_MAX_REPLICA; line = __LINE__;
pSql->fp = asyncCallback; code = TAOS_SYSTEM_ERROR(errno);
goto fail;
}
int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); pSql->param = pSub;
if (code != TSDB_CODE_SUCCESS) { pSql->maxRetry = TSDB_MAX_REPLICA;
THROW(code); pSql->fp = asyncCallback;
} pSql->fetchFp = asyncCallback;
CLEANUP_PUSH_FREE(true, pCmd->payload); pSql->sqlstr = strdup(sql);
if (pSql->sqlstr == NULL) {
line = __LINE__;
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qhandle = 0;
pRes->numOfRows = 1;
pRes->qhandle = 0; code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
pRes->numOfRows = 1; if (code != TSDB_CODE_SUCCESS) {
line = __LINE__;
goto fail;
}
pSql->sqlstr = strdup_throw(sql); code = tsParseSql(pSql, false);
CLEANUP_PUSH_FREE(true, pSql->sqlstr); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
strtolower(pSql->sqlstr, pSql->sqlstr); sem_wait(&pSub->sem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
line = __LINE__;
goto fail;
}
code = tsParseSql(pSql, false); if (pSql->cmd.command != TSDB_SQL_SELECT) {
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { line = __LINE__;
// wait for the callback function to post the semaphore code = TSDB_CODE_TSC_INVALID_SQL;
sem_wait(&pSql->rspSem); goto fail;
code = pSql->res.code; }
}
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code));
THROW( code );
}
if (pSql->cmd.command != TSDB_SQL_SELECT) {
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic);
THROW( -1 ); // TODO
}
pSub = calloc_throw(1, sizeof(SSub));
CLEANUP_PUSH_FREE(true, pSub);
pSql->pSubscription = pSub;
pSub->pSql = pSql;
pSub->signature = pSub;
strncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->topic[sizeof(pSub->topic) - 1] = 0;
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) {
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
CLEANUP_EXECUTE();
} CATCH( code ) {
tscError("failed to create subscription object: %s", tstrerror(code));
CLEANUP_EXECUTE();
pSub = NULL;
} END_TRY
return pSub; return pSub;
fail:
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
if (pSql != NULL) {
tscFreeSqlObj(pSql);
pSql = NULL;
}
if (pSub != NULL) {
taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);
free(pSub);
pSub = NULL;
}
terrno = code;
return NULL;
} }
@ -405,9 +430,10 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
pSql->fp = asyncCallback; pSql->fp = asyncCallback;
pSql->param = pSql; pSql->fetchFp = asyncCallback;
pSql->param = pSub;
tscDoQuery(pSql); tscDoQuery(pSql);
sem_wait(&pSql->rspSem); sem_wait(&pSub->sem);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
continue; continue;
@ -437,7 +463,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
} }
if (keepProgress) { if (keepProgress) {
tscSaveSubscriptionProgress(pSub); if (pSub->progress != NULL) {
tscSaveSubscriptionProgress(pSub);
}
} else { } else {
char path[256]; char path[256];
sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic); sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic);
@ -448,6 +476,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
tscFreeSqlObj(pSub->pSql); tscFreeSqlObj(pSub->pSql);
taosArrayDestroy(pSub->progress); taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);
memset(pSub, 0, sizeof(*pSub)); memset(pSub, 0, sizeof(*pSub));
free(pSub); free(pSub);
} }

View File

@ -41,7 +41,7 @@ int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet); //void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosGetDisk(); taosGetDisk();

View File

@ -2146,16 +2146,19 @@ char* strdup_throw(const char* str) {
} }
int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
tscMgmtEpSet.numOfEps = 0; // init mgmt ip set
tscMgmtEpSet.inUse = 0; tscMgmtEpSet.version = 0;
SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (first && first[0] != 0) { if (first && first[0] != 0) {
if (strlen(first) >= TSDB_EP_LEN) { if (strlen(first) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
taosGetFqdnPortFromEp(first, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]); taosGetFqdnPortFromEp(first, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
tscMgmtEpSet.numOfEps++; mgmtEpSet->numOfEps++;
} }
if (second && second[0] != 0) { if (second && second[0] != 0) {
@ -2163,11 +2166,11 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
taosGetFqdnPortFromEp(second, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]); taosGetFqdnPortFromEp(second, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
tscMgmtEpSet.numOfEps++; mgmtEpSet->numOfEps++;
} }
if ( tscMgmtEpSet.numOfEps == 0) { if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN; terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }

View File

@ -67,8 +67,6 @@ DLL_EXPORT void taos_init();
DLL_EXPORT void taos_cleanup(); DLL_EXPORT void taos_cleanup();
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen,
const char *pass, uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos); DLL_EXPORT void taos_close(TAOS *taos);
typedef struct TAOS_BIND { typedef struct TAOS_BIND {
@ -90,7 +88,6 @@ TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
int taos_stmt_close(TAOS_STMT *stmt); int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_c(TAOS *taos, const char *sql, uint32_t sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res); DLL_EXPORT void taos_free_result(TAOS_RES *res);

View File

@ -35,6 +35,11 @@ typedef struct SRpcEpSet {
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcEpSet; } SRpcEpSet;
typedef struct SRpcCorEpSet {
int32_t version;
SRpcEpSet epSet;
} SRpcCorEpSet;
typedef struct SRpcConnInfo { typedef struct SRpcConnInfo {
uint32_t clientIp; uint32_t clientIp;
uint16_t clientPort; uint16_t clientPort;

View File

@ -77,11 +77,6 @@ typedef struct {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SSdbObject; } SSdbObject;
typedef struct {
int32_t rowSize;
void * row;
} SSdbRow;
typedef struct { typedef struct {
pthread_t thread; pthread_t thread;
int32_t workerId; int32_t workerId;
@ -419,32 +414,28 @@ void sdbDecRef(void *handle, void *pObj) {
} }
} }
static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) { static void *sdbGetRowMeta(SSdbTable *pTable, void *key) {
if (pTable == NULL) return NULL; if (pTable == NULL) return NULL;
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
keySize = strlen((char *)key); keySize = strlen((char *)key);
} }
return taosHashGet(pTable->iHandle, key, keySize); void **ppRow = (void **)taosHashGet(pTable->iHandle, key, keySize);
if (ppRow == NULL) return NULL;
return *ppRow;
} }
static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) { static void *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key)); return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key));
} }
void *sdbGetRow(void *handle, void *key) { void *sdbGetRow(void *handle, void *key) {
SSdbTable *pTable = (SSdbTable *)handle; void *pRow = sdbGetRowMeta(handle, key);
int32_t keySize = sizeof(int32_t); if (pRow) {
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { sdbIncRef(handle, pRow);
keySize = strlen((char *)key); return pRow;
}
SSdbRow *pMeta = taosHashGet(pTable->iHandle, key, keySize);
if (pMeta) {
sdbIncRef(pTable, pMeta->row);
return pMeta->row;
} else { } else {
return NULL; return NULL;
} }
@ -455,10 +446,6 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
} }
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
SSdbRow rowMeta;
rowMeta.rowSize = pOper->rowSize;
rowMeta.row = pOper->pObj;
void * key = sdbGetObjKey(pTable, pOper->pObj); void * key = sdbGetObjKey(pTable, pOper->pObj);
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
@ -466,7 +453,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
keySize = strlen((char *)key); keySize = strlen((char *)key);
} }
taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow)); taosHashPut(pTable->iHandle, key, keySize, &pOper->pObj, sizeof(void **));
sdbIncRef(pTable, pOper->pObj); sdbIncRef(pTable, pOper->pObj);
atomic_add_fetch_32(&pTable->numOfRows, 1); atomic_add_fetch_32(&pTable->numOfRows, 1);
@ -586,17 +573,17 @@ static int sdbWrite(void *param, void *data, int type) {
code = (*pTable->decodeFp)(&oper); code = (*pTable->decodeFp)(&oper);
return sdbInsertHash(pTable, &oper); return sdbInsertHash(pTable, &oper);
} else if (action == SDB_ACTION_DELETE) { } else if (action == SDB_ACTION_DELETE) {
SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); void *pRow = sdbGetRowMeta(pTable, pHead->cont);
if (rowMeta == NULL || rowMeta->row == NULL) { if (pRow == NULL) {
sdbError("table:%s, failed to get object:%s from wal while dispose delete action", pTable->tableName, sdbError("table:%s, failed to get object:%s from wal while dispose delete action", pTable->tableName,
pHead->cont); pHead->cont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSdbOper oper = {.table = pTable, .pObj = rowMeta->row}; SSdbOper oper = {.table = pTable, .pObj = pRow};
return sdbDeleteHash(pTable, &oper); return sdbDeleteHash(pTable, &oper);
} else if (action == SDB_ACTION_UPDATE) { } else if (action == SDB_ACTION_UPDATE) {
SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); void *pRow = sdbGetRowMeta(pTable, pHead->cont);
if (rowMeta == NULL || rowMeta->row == NULL) { if (pRow == NULL) {
sdbError("table:%s, failed to get object:%s from wal while dispose update action", pTable->tableName, sdbError("table:%s, failed to get object:%s from wal while dispose update action", pTable->tableName,
pHead->cont); pHead->cont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -675,18 +662,12 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table; SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj);
if (pMeta == NULL) { if (pRow == NULL) {
sdbDebug("table:%s, record is not there, delete failed", pTable->tableName); sdbDebug("table:%s, record is not there, delete failed", pTable->tableName);
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
} }
void *pMetaRow = pMeta->row;
if (pMetaRow == NULL) {
sdbError("table:%s, record meta is null", pTable->tableName);
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
}
sdbIncRef(pTable, pOper->pObj); sdbIncRef(pTable, pOper->pObj);
int32_t code = sdbDeleteHash(pTable, pOper); int32_t code = sdbDeleteHash(pTable, pOper);
@ -728,18 +709,12 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table; SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj);
if (pMeta == NULL) { if (pRow == NULL) {
sdbDebug("table:%s, record is not there, update failed", pTable->tableName); sdbDebug("table:%s, record is not there, update failed", pTable->tableName);
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
} }
void *pMetaRow = pMeta->row;
if (pMetaRow == NULL) {
sdbError("table:%s, record meta is null", pTable->tableName);
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
}
int32_t code = sdbUpdateHash(pTable, pOper); int32_t code = sdbUpdateHash(pTable, pOper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sdbError("table:%s, failed to update hash", pTable->tableName); sdbError("table:%s, failed to update hash", pTable->tableName);
@ -789,14 +764,14 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
return NULL; return NULL;
} }
SSdbRow *pMeta = taosHashIterGet(pIter); void **ppMetaRow = taosHashIterGet(pIter);
if (pMeta == NULL) { if (ppMetaRow == NULL) {
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
return NULL; return NULL;
} }
*ppRow = pMeta->row; *ppRow = *ppMetaRow;
sdbIncRef(handle, pMeta->row); sdbIncRef(handle, *ppMetaRow);
return pIter; return pIter;
} }
@ -846,11 +821,11 @@ void sdbCloseTable(void *handle) {
SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle); SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle);
while (taosHashIterNext(pIter)) { while (taosHashIterNext(pIter)) {
SSdbRow *pMeta = taosHashIterGet(pIter); void **ppRow = taosHashIterGet(pIter);
if (pMeta == NULL) continue; if (ppRow == NULL) continue;
SSdbOper oper = { SSdbOper oper = {
.pObj = pMeta->row, .pObj = *ppRow,
.table = pTable, .table = pTable,
}; };

View File

@ -107,42 +107,41 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) {
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mError("ctable:%s, not in vgId:%d", pTable->info.tableId, pTable->vgId); mError("ctable:%s, not in vgId:%d", pTable->info.tableId, pTable->vgId);
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
} }
mnodeDecVgroupRef(pVgroup);
SDbObj *pDb = mnodeGetDb(pVgroup->dbName); SDbObj *pDb = NULL;
if (pDb == NULL) { if (pVgroup != NULL) {
mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); pDb = mnodeGetDb(pVgroup->dbName);
return TSDB_CODE_MND_INVALID_DB; if (pDb == NULL) {
mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
}
} }
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
mnodeDecDbRef(pDb);
SAcctObj *pAcct = mnodeGetAcct(pDb->acct); SAcctObj *pAcct = NULL;
if (pAcct == NULL) { if (pDb != NULL) {
mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct); pAcct = mnodeGetAcct(pDb->acct);
return TSDB_CODE_MND_INVALID_ACCT; if (pAcct == NULL) {
mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct);
}
} }
mnodeDecAcctRef(pAcct);
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
// add ref // add ref
pTable->superTable = mnodeGetSuperTableByUid(pTable->suid); pTable->superTable = mnodeGetSuperTableByUid(pTable->suid);
mnodeAddTableIntoStable(pTable->superTable, pTable); mnodeAddTableIntoStable(pTable->superTable, pTable);
grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1); if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
} else { } else {
grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
} }
mnodeAddTableIntoDb(pDb); if (pDb) mnodeAddTableIntoDb(pDb);
mnodeAddTableIntoVgroup(pVgroup, pTable); if (pVgroup) mnodeAddTableIntoVgroup(pVgroup, pTable);
mnodeDecVgroupRef(pVgroup);
mnodeDecDbRef(pDb);
mnodeDecAcctRef(pAcct);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -42,6 +42,7 @@ typedef struct SDiskbasedResultBuf {
void* iBuf; // inmemory buf void* iBuf; // inmemory buf
void* handle; // for debug purpose void* handle; // for debug purpose
void* emptyDummyIdList; // dummy id list
} SDiskbasedResultBuf; } SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)

View File

@ -4964,7 +4964,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo; STableIdInfo tidInfo;
STableId* id = TSDB_TABLEID(pQuery->current); STableId* id = TSDB_TABLEID(pQuery->current->pTable);
tidInfo.uid = id->uid; tidInfo.uid = id->uid;
tidInfo.tid = id->tid; tidInfo.tid = id->tid;

View File

@ -36,6 +36,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
pResBuf->fd = FD_INITIALIZER; pResBuf->fd = FD_INITIALIZER;
pResBuf->pBuf = NULL; pResBuf->pBuf = NULL;
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle, qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle,
pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize); pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize);
@ -173,7 +174,7 @@ int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t slot = getGroupIndex(pResultBuf, groupId); int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) { if (slot < 0) {
return taosArrayInit(1, sizeof(int32_t)); return pResultBuf->emptyDummyIdList;
} else { } else {
return taosArrayGetP(pResultBuf->list, slot); return taosArrayGetP(pResultBuf->list, slot);
} }
@ -206,6 +207,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
} }
taosArrayDestroy(pResultBuf->list); taosArrayDestroy(pResultBuf->list);
taosArrayDestroy(pResultBuf->emptyDummyIdList);
taosHashCleanup(pResultBuf->idsTable); taosHashCleanup(pResultBuf->idsTable);
tfree(pResultBuf->iBuf); tfree(pResultBuf->iBuf);

View File

@ -75,7 +75,7 @@ void taosRUnLockLatch(SRWLatch *pLatch);
// copy on read // copy on read
#define taosCorBeginRead(x) for (uint32_t i_ = 1; 1; ++i_) { \ #define taosCorBeginRead(x) for (uint32_t i_ = 1; 1; ++i_) { \
int32_t old_ = atomic_load_32(x); \ int32_t old_ = atomic_add_fetch_32((x), 0); \
if (old_ & 0x00000001) { \ if (old_ & 0x00000001) { \
if (i_ % 1000 == 0) { \ if (i_ % 1000 == 0) { \
sched_yield(); \ sched_yield(); \
@ -84,7 +84,7 @@ void taosRUnLockLatch(SRWLatch *pLatch);
} }
#define taosCorEndRead(x) \ #define taosCorEndRead(x) \
if (atomic_load_32(x) == old_) { \ if (atomic_add_fetch_32((x), 0) == old_) { \
break; \ break; \
} \ } \
} }

474
tests/examples/c/apitest.c Normal file
View File

@ -0,0 +1,474 @@
// sample code to verify all TDengine API
// to compile: gcc -o apitest apitest.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h>
#include <unistd.h>
static void prepare_data(TAOS* taos) {
taos_query(taos, "drop database if exists test;");
usleep(100000);
taos_query(taos, "create database test;");
usleep(100000);
taos_select_db(taos, "test");
taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
taos_query(taos, "create table t0 using meters tags(0);");
taos_query(taos, "create table t1 using meters tags(1);");
taos_query(taos, "create table t2 using meters tags(2);");
taos_query(taos, "create table t3 using meters tags(3);");
taos_query(taos, "create table t4 using meters tags(4);");
taos_query(taos, "create table t5 using meters tags(5);");
taos_query(taos, "create table t6 using meters tags(6);");
taos_query(taos, "create table t7 using meters tags(7);");
taos_query(taos, "create table t8 using meters tags(8);");
taos_query(taos, "create table t9 using meters tags(9);");
TAOS_RES* res = taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0)"
" ('2020-01-01 00:01:00.000', 0)"
" ('2020-01-01 00:02:00.000', 0)"
" t1 values('2020-01-01 00:00:00.000', 0)"
" ('2020-01-01 00:01:00.000', 0)"
" ('2020-01-01 00:02:00.000', 0)"
" ('2020-01-01 00:03:00.000', 0)"
" t2 values('2020-01-01 00:00:00.000', 0)"
" ('2020-01-01 00:01:00.000', 0)"
" ('2020-01-01 00:01:01.000', 0)"
" ('2020-01-01 00:01:02.000', 0)"
" t3 values('2020-01-01 00:01:02.000', 0)"
" t4 values('2020-01-01 00:01:02.000', 0)"
" t5 values('2020-01-01 00:01:02.000', 0)"
" t6 values('2020-01-01 00:01:02.000', 0)"
" t7 values('2020-01-01 00:01:02.000', 0)"
" t8 values('2020-01-01 00:01:02.000', 0)"
" t9 values('2020-01-01 00:01:02.000', 0)");
int affected = taos_affected_rows(res);
if (affected != 18) {
printf("\033[31m%d rows affected by last insert statement, but it should be 18\033[0m\n", affected);
}
// super tables subscription
usleep(1000000);
}
static int print_result(TAOS_RES* res, int blockFetch) {
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0;
if (blockFetch) {
int rows = 0;
while ((rows = taos_fetch_block(res, &row))) {
for (int i = 0; i < rows; i++) {
char temp[256];
taos_print_row(temp, row + i, fields, num_fields);
puts(temp);
}
nRows += rows;
}
} else {
while ((row = taos_fetch_row(res))) {
char temp[256];
taos_print_row(temp, row, fields, num_fields);
puts(temp);
nRows++;
}
}
printf("%d rows consumed.\n", nRows);
return nRows;
}
static void check_row_count(int line, TAOS_RES* res, int expected) {
int actual = print_result(res, expected % 2);
if (actual != expected) {
printf("\033[31mline %d: row count mismatch, expected: %d, actual: %d\033[0m\n", line, expected, actual);
} else {
printf("line %d: %d rows consumed as expected\n", line, actual);
}
}
static void verify_query(TAOS* taos) {
prepare_data(taos);
int code = taos_load_table_info(taos, "t0,t1,t2,t3,t4,t5,t6,t7,t8,t9");
if (code != 0) {
printf("\033[31mfailed to load table info: 0x%08x\033[0m\n", code);
}
code = taos_validate_sql(taos, "select * from nonexisttable");
if (code == 0) {
printf("\033[31mimpossible, the table does not exists\033[0m\n");
}
code = taos_validate_sql(taos, "select * from meters");
if (code != 0) {
printf("\033[31mimpossible, the table does exists: 0x%08x\033[0m\n", code);
}
TAOS_RES* res = taos_query(taos, "select * from meters");
check_row_count(__LINE__, res, 18);
printf("result precision is: %d\n", taos_result_precision(res));
int c = taos_field_count(res);
printf("field count is: %d\n", c);
int* lengths = taos_fetch_lengths(res);
for (int i = 0; i < c; i++) {
printf("length of column %d is %d\n", i, lengths[i]);
}
taos_free_result(res);
res = taos_query(taos, "select * from t0");
check_row_count(__LINE__, res, 3);
taos_free_result(res);
res = taos_query(taos, "select * from nonexisttable");
code = taos_errno(res);
printf("code=%d, error msg=%s\n", code, taos_errstr(res));
taos_free_result(res);
res = taos_query(taos, "select * from meters");
taos_stop_query(res);
}
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
int rows = print_result(res, *(int*)param);
printf("%d rows consumed in subscribe_callback\n", rows);
}
static void verify_subscribe(TAOS* taos) {
prepare_data(taos);
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub);
check_row_count(__LINE__, res, 18);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
// keep progress information and restart subscription
taos_unsubscribe(tsub, 1);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
// keep progress information and continue previous subscription
taos_unsubscribe(tsub, 1);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
// don't keep progress information and continue previous subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
// single meter subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 5);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
taos_unsubscribe(tsub, 0);
int blockFetch = 0;
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", subscribe_callback, &blockFetch, 1000);
usleep(2000000);
taos_query(taos, "insert into t0 values('2020-01-01 00:05:00.001', 0);");
usleep(2000000);
taos_unsubscribe(tsub, 0);
}
void verify_prepare(TAOS* taos) {
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
usleep(100000);
taos_query(taos, "create database test;");
int code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create database, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
return;
}
taos_free_result(result);
usleep(100000);
taos_select_db(taos, "test");
// create table
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10))";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create table, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
return;
}
taos_free_result(result);
// insert 10 records
struct {
int64_t ts;
int8_t b;
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
char bin[40];
char blob[80];
} v = {0};
TAOS_STMT* stmt = taos_stmt_init(taos);
TAOS_BIND params[10];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
params[0].length = &params[0].buffer_length;
params[0].is_null = NULL;
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
params[1].buffer_length = sizeof(v.b);
params[1].buffer = &v.b;
params[1].length = &params[1].buffer_length;
params[1].is_null = NULL;
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
params[2].buffer_length = sizeof(v.v1);
params[2].buffer = &v.v1;
params[2].length = &params[2].buffer_length;
params[2].is_null = NULL;
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
params[3].buffer_length = sizeof(v.v2);
params[3].buffer = &v.v2;
params[3].length = &params[3].buffer_length;
params[3].is_null = NULL;
params[4].buffer_type = TSDB_DATA_TYPE_INT;
params[4].buffer_length = sizeof(v.v4);
params[4].buffer = &v.v4;
params[4].length = &params[4].buffer_length;
params[4].is_null = NULL;
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
params[5].buffer_length = sizeof(v.v8);
params[5].buffer = &v.v8;
params[5].length = &params[5].buffer_length;
params[5].is_null = NULL;
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[6].buffer_length = sizeof(v.f4);
params[6].buffer = &v.f4;
params[6].length = &params[6].buffer_length;
params[6].is_null = NULL;
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
params[7].buffer_length = sizeof(v.f8);
params[7].buffer = &v.f8;
params[7].length = &params[7].buffer_length;
params[7].is_null = NULL;
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
params[8].buffer_length = sizeof(v.bin);
params[8].buffer = v.bin;
params[8].length = &params[8].buffer_length;
params[8].is_null = NULL;
strcpy(v.blob, "一二三四五六七八九十");
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
params[9].buffer_length = strlen(v.blob);
params[9].buffer = v.blob;
params[9].length = &params[9].buffer_length;
params[9].is_null = NULL;
int is_null = 1;
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("\033[31mfailed to execute taos_stmt_prepare. code:0x%x\033[0m\n", code);
}
v.ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts += 1;
for (int j = 1; j < 10; ++j) {
params[j].is_null = ((i == j) ? &is_null : 0);
}
v.b = (int8_t)i % 2;
v.v1 = (int8_t)i;
v.v2 = (int16_t)(i * 2);
v.v4 = (int32_t)(i * 4);
v.v8 = (int64_t)(i * 8);
v.f4 = (float)(i * 40);
v.f8 = (double)(i * 80);
for (int j = 0; j < sizeof(v.bin) - 1; ++j) {
v.bin[j] = (char)(i + '0');
}
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
}
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute insert statement.\033[0m\n");
return;
}
taos_stmt_close(stmt);
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
v.v1 = 5;
v.v2 = 15;
taos_stmt_bind_param(stmt, params + 2);
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute select statement.\033[0m\n");
return;
}
result = taos_stmt_use_result(stmt);
TAOS_ROW row;
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256];
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
taos_stmt_close(stmt);
}
void retrieve_callback(void *param, TAOS_RES *tres, int numOfRows)
{
if (numOfRows > 0) {
printf("%d rows async retrieved\n", numOfRows);
taos_fetch_rows_a(tres, retrieve_callback, param);
} else {
if (numOfRows < 0) {
printf("\033[31masync retrieve failed, code: %d\033[0m\n", numOfRows);
} else {
printf("async retrieve completed\n");
}
taos_free_result(tres);
}
}
void select_callback(void *param, TAOS_RES *tres, int code)
{
if (code == 0 && tres) {
taos_fetch_rows_a(tres, retrieve_callback, param);
} else {
printf("\033[31masync select failed, code: %d\033[0m\n", code);
}
}
void verify_async(TAOS* taos) {
prepare_data(taos);
taos_query_a(taos, "select * from meters", select_callback, NULL);
usleep(1000000);
}
void stream_callback(void *param, TAOS_RES *res, TAOS_ROW row) {
int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res);
printf("got one row from stream_callback\n");
char temp[256];
taos_print_row(temp, row, fields, num_fields);
puts(temp);
}
void verify_stream(TAOS* taos) {
prepare_data(taos);
TAOS_STREAM* strm = taos_open_stream(
taos,
"select count(*) from meters interval(1m)",
stream_callback,
0,
NULL,
NULL);
printf("waiting for stream data\n");
usleep(100000);
taos_query(taos, "insert into t0 values(now, 0)(now+5s,1)(now+10s, 2);");
usleep(200000000);
taos_close_stream(strm);
}
int main(int argc, char *argv[]) {
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
taos_init();
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
exit(1);
}
char* info = taos_get_server_info(taos);
printf("server info: %s\n", info);
info = taos_get_client_info(taos);
printf("client info: %s\n", info);
printf("************ verify query *************\n");
verify_query(taos);
printf("********* verify async query **********\n");
verify_async(taos);
printf("*********** verify subscribe ************\n");
verify_subscribe(taos);
printf("************ verify prepare *************\n");
verify_prepare(taos);
printf("************ verify stream *************\n");
verify_stream(taos);
printf("done\n");
taos_close(taos);
taos_cleanup();
}

View File

@ -4,7 +4,6 @@
ROOT=./ ROOT=./
TARGET=exe TARGET=exe
LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
#LFLAGS = '-Wl,-rpath,/home/zbm/project/td/debug/build/lib/' -L/home/zbm/project/td/debug/build/lib -ltaos -lpthread -lm -lrt
CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion -Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX -msse4.2 -Wno-unused-function -D_M_X64 \ CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion -Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX -msse4.2 -Wno-unused-function -D_M_X64 \
-I/usr/local/taos/include -std=gnu99 -I/usr/local/taos/include -std=gnu99
@ -16,6 +15,7 @@ exe:
gcc $(CFLAGS) ./prepare.c -o $(ROOT)/prepare $(LFLAGS) gcc $(CFLAGS) ./prepare.c -o $(ROOT)/prepare $(LFLAGS)
gcc $(CFLAGS) ./stream.c -o $(ROOT)/stream $(LFLAGS) gcc $(CFLAGS) ./stream.c -o $(ROOT)/stream $(LFLAGS)
gcc $(CFLAGS) ./subscribe.c -o $(ROOT)subscribe $(LFLAGS) gcc $(CFLAGS) ./subscribe.c -o $(ROOT)subscribe $(LFLAGS)
gcc $(CFLAGS) ./apitest.c -o $(ROOT)apitest $(LFLAGS)
clean: clean:
rm $(ROOT)/asyncdemo rm $(ROOT)/asyncdemo