TBASE-1423: single meter subscription
This commit is contained in:
parent
4ba0819f58
commit
f2eac06a33
|
@ -630,7 +630,13 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
||||||
pQuery = &(pQInfo->query);
|
pQuery = &(pQInfo->query);
|
||||||
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
|
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
|
||||||
|
|
||||||
pQuery->skey = pQueryMsg->skey;
|
SMeterSidExtInfo** pSids = (SMeterSidExtInfo**)pQueryMsg->pSidExtInfo;
|
||||||
|
if (pSids != NULL && pSids[0]->key > 0) {
|
||||||
|
pQuery->skey = pSids[0]->key;
|
||||||
|
} else {
|
||||||
|
pQuery->skey = pQueryMsg->skey;
|
||||||
|
}
|
||||||
|
|
||||||
pQuery->ekey = pQueryMsg->ekey;
|
pQuery->ekey = pQueryMsg->ekey;
|
||||||
pQuery->lastKey = pQuery->skey;
|
pQuery->lastKey = pQuery->skey;
|
||||||
|
|
||||||
|
|
|
@ -444,6 +444,8 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
|
||||||
int progressSize = 0;
|
int progressSize = 0;
|
||||||
if (pQInfo->pMeterQuerySupporter != NULL)
|
if (pQInfo->pMeterQuerySupporter != NULL)
|
||||||
progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t);
|
progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t);
|
||||||
|
else if (pQInfo->pObj != NULL)
|
||||||
|
progressSize = sizeof(int64_t) + sizeof(TSKEY) + sizeof(int32_t);
|
||||||
|
|
||||||
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100);
|
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100);
|
||||||
if (pStart == NULL) {
|
if (pStart == NULL) {
|
||||||
|
@ -478,7 +480,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
|
||||||
|
|
||||||
// write the progress information of each meter to response
|
// write the progress information of each meter to response
|
||||||
// this is required by subscriptions
|
// this is required by subscriptions
|
||||||
if (progressSize > 0) {
|
if (pQInfo->pMeterQuerySupporter != NULL) {
|
||||||
*((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters);
|
*((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters);
|
||||||
pMsg += sizeof(int32_t);
|
pMsg += sizeof(int32_t);
|
||||||
for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) {
|
for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) {
|
||||||
|
@ -487,6 +489,13 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
|
||||||
*((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key);
|
*((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key);
|
||||||
pMsg += sizeof(TSKEY);
|
pMsg += sizeof(TSKEY);
|
||||||
}
|
}
|
||||||
|
} else if (pQInfo->pObj != NULL) {
|
||||||
|
*((int32_t*)pMsg) = htonl(1);
|
||||||
|
pMsg += sizeof(int32_t);
|
||||||
|
*((int64_t*)pMsg) = htobe64(pQInfo->pObj->uid);
|
||||||
|
pMsg += sizeof(int64_t);
|
||||||
|
*((TSKEY*)pMsg) = htobe64(pQInfo->query.lastKey);
|
||||||
|
pMsg += sizeof(TSKEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
msgLen = pMsg - pStart;
|
msgLen = pMsg - pStart;
|
||||||
|
|
|
@ -4,16 +4,17 @@
|
||||||
ROOT=./
|
ROOT=./
|
||||||
TARGET=exe
|
TARGET=exe
|
||||||
LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
|
LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
|
||||||
|
#LFLAGS = '-Wl,-rpath,/home/zbm/project/td/debug/build/lib/' -L/home/zbm/project/td/debug/build/lib -ltaos -lpthread -lm -lrt
|
||||||
CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion -Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX -msse4.2 -Wno-unused-function -D_M_X64 \
|
CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion -Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX -msse4.2 -Wno-unused-function -D_M_X64 \
|
||||||
-I/usr/local/taos/include -std=gnu99
|
-I/usr/local/taos/include -std=gnu99
|
||||||
|
|
||||||
all: $(TARGET)
|
all: $(TARGET)
|
||||||
|
|
||||||
exe:
|
exe:
|
||||||
gcc $(CFLAGS) ./asyncdemo.c -o $(ROOT)/asyncdemo $(LFLAGS)
|
gcc $(CFLAGS) ./asyncdemo.c -o $(ROOT)/asyncdemo $(LFLAGS)
|
||||||
gcc $(CFLAGS) ./demo.c -o $(ROOT)/demo $(LFLAGS)
|
gcc $(CFLAGS) ./demo.c -o $(ROOT)/demo $(LFLAGS)
|
||||||
gcc $(CFLAGS) ./prepare.c -o $(ROOT)/prepare $(LFLAGS)
|
gcc $(CFLAGS) ./prepare.c -o $(ROOT)/prepare $(LFLAGS)
|
||||||
gcc $(CFLAGS) ./stream.c -o $(ROOT)/stream $(LFLAGS)
|
gcc $(CFLAGS) ./stream.c -o $(ROOT)/stream $(LFLAGS)
|
||||||
gcc $(CFLAGS) ./subscribe.c -o $(ROOT)subscribe $(LFLAGS)
|
gcc $(CFLAGS) ./subscribe.c -o $(ROOT)subscribe $(LFLAGS)
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
|
|
|
@ -28,6 +28,7 @@ int main(int argc, char *argv[]) {
|
||||||
const char* host = "127.0.0.1";
|
const char* host = "127.0.0.1";
|
||||||
const char* user = "root";
|
const char* user = "root";
|
||||||
const char* passwd = "taosdata";
|
const char* passwd = "taosdata";
|
||||||
|
const char* sql = "select * from meters;";
|
||||||
int async = 1, restart = 0;
|
int async = 1, restart = 0;
|
||||||
TAOS_SUB* tsub = NULL;
|
TAOS_SUB* tsub = NULL;
|
||||||
|
|
||||||
|
@ -52,6 +53,10 @@ int main(int argc, char *argv[]) {
|
||||||
restart = 1;
|
restart = 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (strcmp(argv[i], "-single") == 0) {
|
||||||
|
sql = "select * from t0;";
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// init TAOS
|
// init TAOS
|
||||||
|
@ -64,9 +69,9 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (async) {
|
if (async) {
|
||||||
tsub = taos_subscribe("test", restart, taos, "select * from meters;", subscribe_callback, NULL, 1000);
|
tsub = taos_subscribe("test", restart, taos, sql, subscribe_callback, NULL, 1000);
|
||||||
} else {
|
} else {
|
||||||
tsub = taos_subscribe("test", restart, taos, "select * from meters;", NULL, NULL, 0);
|
tsub = taos_subscribe("test", restart, taos, sql, NULL, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsub == NULL) {
|
if (tsub == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue