merge 3.0
This commit is contained in:
commit
d1b07a1abf
|
@ -60,7 +60,6 @@ int32_t init_env() {
|
||||||
pRes =
|
pRes =
|
||||||
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
|
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
assert(0);
|
|
||||||
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -104,8 +103,8 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
|
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
|
||||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
|
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -163,9 +162,10 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "group.id", "tg2");
|
tmq_conf_set(conf, "group.id", "tg2");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "td.connect.db", "abc1");
|
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
|
||||||
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
|
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
|
||||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
assert(tmq);
|
||||||
return tmq;
|
return tmq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2069,80 +2069,6 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t leftForVer;
|
|
||||||
int32_t vgId;
|
|
||||||
int32_t epoch;
|
|
||||||
int64_t consumerId;
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
} SMqCancelConnReq;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqCancelConnReq(void** buf, const SMqCancelConnReq* pReq) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->epoch);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
|
||||||
tlen += taosEncodeString(buf, pReq->topicName);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqCancelConnReq(void* buf, SMqCancelConnReq* pReq) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->epoch);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
|
||||||
buf = taosDecodeStringTo(buf, pReq->topicName);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t reserved;
|
|
||||||
} SMqCancelConnRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t leftForVer;
|
|
||||||
int32_t vgId;
|
|
||||||
int64_t oldConsumerId;
|
|
||||||
int64_t newConsumerId;
|
|
||||||
char* topic;
|
|
||||||
} SMqMVRebReq;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
|
||||||
tlen += taosEncodeString(buf, pReq->topic);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
|
||||||
buf = taosDecodeString(buf, &pReq->topic);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMsgHead header;
|
|
||||||
int32_t vgId;
|
|
||||||
int64_t consumerId;
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
|
||||||
} SMqSetCVgRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMsgHead header;
|
|
||||||
int32_t vgId;
|
|
||||||
int64_t consumerId;
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
|
||||||
} SMqMVRebRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
@ -2169,6 +2095,24 @@ typedef struct {
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
} SSchemaWrapper;
|
} SSchemaWrapper;
|
||||||
|
|
||||||
|
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||||
|
if (pSW == NULL) return pSW;
|
||||||
|
pSW->nCols = pSchemaWrapper->nCols;
|
||||||
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
|
if (pSW->pSchema == NULL) {
|
||||||
|
taosMemoryFree(pSW);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema));
|
||||||
|
return pSW;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
|
||||||
|
taosMemoryFree(pSchemaWrapper->pSchema);
|
||||||
|
taosMemoryFree(pSchemaWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
|
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
||||||
|
@ -2179,13 +2123,13 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
|
static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
|
||||||
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
||||||
buf = taosDecodeFixedI8(buf, &pSchema->flags);
|
buf = taosDecodeFixedI8(buf, &pSchema->flags);
|
||||||
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
||||||
buf = taosDecodeFixedI16(buf, &pSchema->colId);
|
buf = taosDecodeFixedI16(buf, &pSchema->colId);
|
||||||
buf = taosDecodeStringTo(buf, pSchema->name);
|
buf = taosDecodeStringTo(buf, pSchema->name);
|
||||||
return buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
|
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
|
||||||
|
@ -2215,7 +2159,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
|
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
||||||
buf = taosDecodeFixedU32(buf, &pSW->nCols);
|
buf = taosDecodeFixedU32(buf, &pSW->nCols);
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) {
|
if (pSW->pSchema == NULL) {
|
||||||
|
@ -2225,7 +2169,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pS
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
||||||
}
|
}
|
||||||
return buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
|
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
|
||||||
|
@ -2632,6 +2576,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
|
||||||
void* data = taosArrayGetP(pRsp->blockData, i);
|
void* data = taosArrayGetP(pRsp->blockData, i);
|
||||||
tlen += taosEncodeFixedI32(buf, bLen);
|
tlen += taosEncodeFixedI32(buf, bLen);
|
||||||
tlen += taosEncodeBinary(buf, data, bLen);
|
tlen += taosEncodeBinary(buf, data, bLen);
|
||||||
|
if (pRsp->withSchema) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
|
||||||
|
tlen += taosEncodeSSchemaWrapper(buf, pSW);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
|
@ -2644,6 +2592,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
||||||
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
|
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
if (pRsp->blockNum != 0) {
|
if (pRsp->blockNum != 0) {
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
|
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
|
||||||
|
@ -2656,6 +2605,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
||||||
buf = taosDecodeBinary(buf, &data, bLen);
|
buf = taosDecodeBinary(buf, &data, bLen);
|
||||||
taosArrayPush(pRsp->blockDataLen, &bLen);
|
taosArrayPush(pRsp->blockDataLen, &bLen);
|
||||||
taosArrayPush(pRsp->blockData, &data);
|
taosArrayPush(pRsp->blockData, &data);
|
||||||
|
if (pRsp->withSchema) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||||
|
buf = taosDecodeSSchemaWrapper(buf, pSW);
|
||||||
|
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
|
|
|
@ -231,6 +231,10 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
|
||||||
msg->resIter++;
|
msg->resIter++;
|
||||||
if (msg->resIter < msg->rsp.blockNum) {
|
if (msg->resIter < msg->rsp.blockNum) {
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter);
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter);
|
||||||
|
if (msg->rsp.withSchema) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(msg->rsp.blockSchema, msg->resIter);
|
||||||
|
setResSchemaInfo(&msg->resInfo, pSW->pSchema, pSW->nCols);
|
||||||
|
}
|
||||||
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
|
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
|
||||||
return &msg->resInfo;
|
return &msg->resInfo;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,9 +14,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "catalog.h"
|
#include "catalog.h"
|
||||||
#include "scheduler.h"
|
|
||||||
#include "clientInt.h"
|
#include "clientInt.h"
|
||||||
#include "clientLog.h"
|
#include "clientLog.h"
|
||||||
|
#include "scheduler.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
static SClientHbMgr clientHbMgr = {0};
|
static SClientHbMgr clientHbMgr = {0};
|
||||||
|
@ -110,7 +110,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
||||||
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||||
if (NULL == info) {
|
if (NULL == info) {
|
||||||
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.connType);
|
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
|
||||||
|
pRsp->connKey.connType);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +122,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||||
} else {
|
} else {
|
||||||
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
|
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
|
||||||
pTscObj->connId = pRsp->query->connId;
|
pTscObj->connId = pRsp->query->connId;
|
||||||
|
|
||||||
if (pRsp->query->killRid) {
|
if (pRsp->query->killRid) {
|
||||||
SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
|
SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
|
||||||
if (NULL == pRequest) {
|
if (NULL == pRequest) {
|
||||||
|
@ -131,7 +132,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||||
releaseRequest(pRsp->query->killRid);
|
releaseRequest(pRsp->query->killRid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->query->killConnection) {
|
if (pRsp->query->killConnection) {
|
||||||
taos_close(pTscObj);
|
taos_close(pTscObj);
|
||||||
}
|
}
|
||||||
|
@ -139,7 +140,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||||
releaseTscObj(pRsp->connKey.tscRid);
|
releaseTscObj(pRsp->connKey.tscRid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
|
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
|
||||||
|
|
||||||
tscDebug("hb got %d rsp kv", kvNum);
|
tscDebug("hb got %d rsp kv", kvNum);
|
||||||
|
@ -236,24 +237,24 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code)
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
|
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
|
||||||
int64_t now = taosGetTimestampUs();
|
int64_t now = taosGetTimestampUs();
|
||||||
SQueryDesc desc = {0};
|
SQueryDesc desc = {0};
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
void *pIter = taosHashIterate(pObj->pRequests, NULL);
|
void *pIter = taosHashIterate(pObj->pRequests, NULL);
|
||||||
while (pIter != NULL) {
|
while (pIter != NULL) {
|
||||||
int64_t *rid = pIter;
|
int64_t *rid = pIter;
|
||||||
SRequestObj *pRequest = acquireRequest(*rid);
|
SRequestObj *pRequest = acquireRequest(*rid);
|
||||||
if (NULL == pRequest) {
|
if (NULL == pRequest) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
|
tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
|
||||||
desc.stime = pRequest->metric.start;
|
desc.stime = pRequest->metric.start;
|
||||||
desc.queryId = pRequest->requestId;
|
desc.queryId = pRequest->requestId;
|
||||||
desc.useconds = now - pRequest->metric.start;
|
desc.useconds = now - pRequest->metric.start;
|
||||||
desc.reqRid = pRequest->self;
|
desc.reqRid = pRequest->self;
|
||||||
desc.pid = hbBasic->pid;
|
desc.pid = hbBasic->pid;
|
||||||
taosGetFqdn(desc.fqdn);
|
taosGetFqdn(desc.fqdn);
|
||||||
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
|
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
|
||||||
|
|
||||||
|
@ -271,9 +272,9 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
releaseRequest(*rid);
|
releaseRequest(*rid);
|
||||||
taosArrayPush(hbBasic->queryDesc, &desc);
|
taosArrayPush(hbBasic->queryDesc, &desc);
|
||||||
|
|
||||||
pIter = taosHashIterate(pObj->pRequests, pIter);
|
pIter = taosHashIterate(pObj->pRequests, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,14 +287,14 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
||||||
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
|
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
|
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
|
||||||
if (numOfQueries <= 0) {
|
if (numOfQueries <= 0) {
|
||||||
releaseTscObj(connKey->tscRid);
|
releaseTscObj(connKey->tscRid);
|
||||||
tscDebug("no queries on connection");
|
tscDebug("no queries on connection");
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
|
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
|
||||||
if (NULL == hbBasic) {
|
if (NULL == hbBasic) {
|
||||||
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
|
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
|
||||||
|
@ -308,7 +309,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
||||||
taosMemoryFree(hbBasic);
|
taosMemoryFree(hbBasic);
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
hbBasic->connId = pTscObj->connId;
|
hbBasic->connId = pTscObj->connId;
|
||||||
hbBasic->pid = taosGetPId();
|
hbBasic->pid = taosGetPId();
|
||||||
taosGetAppName(hbBasic->app, NULL);
|
taosGetAppName(hbBasic->app, NULL);
|
||||||
|
@ -405,7 +406,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
||||||
}
|
}
|
||||||
|
|
||||||
hbGetQueryBasicInfo(connKey, req);
|
hbGetQueryBasicInfo(connKey, req);
|
||||||
|
|
||||||
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
|
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -471,10 +472,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
||||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (code) {
|
// if (code) {
|
||||||
// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
|
// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
|
||||||
// taosMemoryFreeClear(pBatchReq);
|
// taosMemoryFreeClear(pBatchReq);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
return pBatchReq;
|
return pBatchReq;
|
||||||
}
|
}
|
||||||
|
@ -630,24 +631,23 @@ void appHbMgrCleanup(void) {
|
||||||
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
|
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||||
|
|
||||||
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
|
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
|
||||||
while (pIter != NULL) {
|
while (pIter != NULL) {
|
||||||
SClientHbReq *pOneReq = pIter;
|
SClientHbReq *pOneReq = pIter;
|
||||||
hbFreeReq(pOneReq);
|
hbFreeReq(pOneReq);
|
||||||
taosHashCleanup(pOneReq->info);
|
taosHashCleanup(pOneReq->info);
|
||||||
pIter = taosHashIterate(pTarget->activeInfo, pIter);
|
pIter = taosHashIterate(pTarget->activeInfo, pIter);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pTarget->activeInfo);
|
taosHashCleanup(pTarget->activeInfo);
|
||||||
pTarget->activeInfo = NULL;
|
pTarget->activeInfo = NULL;
|
||||||
|
|
||||||
|
|
||||||
pIter = taosHashIterate(pTarget->connInfo, NULL);
|
pIter = taosHashIterate(pTarget->connInfo, NULL);
|
||||||
while (pIter != NULL) {
|
while (pIter != NULL) {
|
||||||
SHbConnInfo *info = pIter;
|
SHbConnInfo *info = pIter;
|
||||||
taosMemoryFree(info->param);
|
taosMemoryFree(info->param);
|
||||||
pIter = taosHashIterate(pTarget->connInfo, pIter);
|
pIter = taosHashIterate(pTarget->connInfo, pIter);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pTarget->connInfo);
|
taosHashCleanup(pTarget->connInfo);
|
||||||
pTarget->connInfo = NULL;
|
pTarget->connInfo = NULL;
|
||||||
|
|
||||||
|
@ -668,13 +668,13 @@ int hbMgrInit() {
|
||||||
hbMgrInitHandle();
|
hbMgrInitHandle();
|
||||||
|
|
||||||
// init backgroud thread
|
// init backgroud thread
|
||||||
//hbCreateThread();
|
/*hbCreateThread();*/
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void hbMgrCleanUp() {
|
void hbMgrCleanUp() {
|
||||||
//hbStopThread();
|
// hbStopThread();
|
||||||
|
|
||||||
// destroy all appHbMgr
|
// destroy all appHbMgr
|
||||||
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
|
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
|
||||||
|
@ -747,11 +747,11 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
|
||||||
taosMemoryFree(info->param);
|
taosMemoryFree(info->param);
|
||||||
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
|
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pReq || NULL == info) {
|
if (NULL == pReq || NULL == info) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -226,17 +226,15 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
|
|
||||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
||||||
pRequest->type = pQuery->msgType;
|
pRequest->type = pQuery->msgType;
|
||||||
SPlanContext cxt = {
|
SPlanContext cxt = {.queryId = pRequest->requestId,
|
||||||
.queryId = pRequest->requestId,
|
.acctId = pRequest->pTscObj->acctId,
|
||||||
.acctId = pRequest->pTscObj->acctId,
|
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
.pAstRoot = pQuery->pRoot,
|
||||||
.pAstRoot = pQuery->pRoot,
|
.showRewrite = pQuery->showRewrite,
|
||||||
.showRewrite = pQuery->showRewrite,
|
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
|
||||||
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
|
.pMsg = pRequest->msgBuf,
|
||||||
.pMsg = pRequest->msgBuf,
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
||||||
};
|
|
||||||
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||||
}
|
}
|
||||||
|
@ -247,6 +245,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
|
||||||
ASSERT(pSchema != NULL && numOfCols > 0);
|
ASSERT(pSchema != NULL && numOfCols > 0);
|
||||||
|
|
||||||
pResInfo->numOfCols = numOfCols;
|
pResInfo->numOfCols = numOfCols;
|
||||||
|
// TODO handle memory leak
|
||||||
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||||
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||||
|
|
||||||
|
@ -282,7 +281,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
|
|
||||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||||
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
||||||
pRequest->metric.start, &res);
|
pRequest->metric.start, &res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (pRequest->body.queryJob != 0) {
|
if (pRequest->body.queryJob != 0) {
|
||||||
schedulerFreeJob(pRequest->body.queryJob);
|
schedulerFreeJob(pRequest->body.queryJob);
|
||||||
|
@ -840,12 +839,12 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* p = (char*) pResultInfo->pData;
|
char* p = (char*)pResultInfo->pData;
|
||||||
|
|
||||||
int32_t dataLen = *(int32_t*) p;
|
int32_t dataLen = *(int32_t*)p;
|
||||||
p += sizeof(int32_t);
|
p += sizeof(int32_t);
|
||||||
|
|
||||||
uint64_t groupId = *(uint64_t*) p;
|
uint64_t groupId = *(uint64_t*)p;
|
||||||
p += sizeof(uint64_t);
|
p += sizeof(uint64_t);
|
||||||
|
|
||||||
int32_t* colLength = (int32_t*)p;
|
int32_t* colLength = (int32_t*)p;
|
||||||
|
|
|
@ -376,7 +376,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
|
||||||
ASSERT(user);
|
ASSERT(user);
|
||||||
ASSERT(pass);
|
ASSERT(pass);
|
||||||
ASSERT(conf->db);
|
/*ASSERT(conf->db);*/
|
||||||
ASSERT(conf->groupId[0]);
|
ASSERT(conf->groupId[0]);
|
||||||
|
|
||||||
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
|
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
|
||||||
|
@ -1118,7 +1118,9 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
|
|
||||||
pRspObj->resInfo.totalRows = 0;
|
pRspObj->resInfo.totalRows = 0;
|
||||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||||
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
if (!pWrapper->msg.withSchema) {
|
||||||
|
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
||||||
|
}
|
||||||
|
|
||||||
taosFreeQitem(pWrapper);
|
taosFreeQitem(pWrapper);
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
|
|
|
@ -202,6 +202,17 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = vnodeStart(pImpl);
|
||||||
|
if (code != 0) {
|
||||||
|
tFreeSCreateVnodeReq(&createReq);
|
||||||
|
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
|
||||||
|
vnodeClose(pImpl);
|
||||||
|
vnodeDestroy(path, pMgmt->pTfs);
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
code = vmWriteVnodesToFile(pMgmt);
|
code = vmWriteVnodesToFile(pMgmt);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tFreeSCreateVnodeReq(&createReq);
|
tFreeSCreateVnodeReq(&createReq);
|
||||||
|
|
|
@ -74,12 +74,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sync integration
|
|
||||||
vnodeSyncSetQ(pImpl, NULL);
|
|
||||||
vnodeSyncSetRpc(pImpl, NULL);
|
|
||||||
int32_t ret = vnodeSyncStart(pImpl);
|
|
||||||
assert(ret == 0);
|
|
||||||
|
|
||||||
taosWLockLatch(&pMgmt->latch);
|
taosWLockLatch(&pMgmt->latch);
|
||||||
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
@ -153,6 +147,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
pThread->failed++;
|
pThread->failed++;
|
||||||
} else {
|
} else {
|
||||||
vmOpenVnode(pMgmt, pCfg, pImpl);
|
vmOpenVnode(pMgmt, pCfg, pImpl);
|
||||||
|
//vnodeStart(pImpl);
|
||||||
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
pThread->opened++;
|
pThread->opened++;
|
||||||
}
|
}
|
||||||
|
@ -364,10 +359,52 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vmStart(SMgmtWrapper *pWrapper) {
|
||||||
|
dDebug("vnode-mgmt start to run");
|
||||||
|
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SVnodeObj **ppVnode = pIter;
|
||||||
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
vnodeStart(pVnode->pImpl);
|
||||||
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vmStop(SMgmtWrapper *pWrapper) {
|
||||||
|
#if 0
|
||||||
|
dDebug("vnode-mgmt start to stop");
|
||||||
|
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SVnodeObj **ppVnode = pIter;
|
||||||
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
vnodeStop(pVnode->pImpl);
|
||||||
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
||||||
SMgmtFp mgmtFp = {0};
|
SMgmtFp mgmtFp = {0};
|
||||||
mgmtFp.openFp = vmInit;
|
mgmtFp.openFp = vmInit;
|
||||||
mgmtFp.closeFp = vmCleanup;
|
mgmtFp.closeFp = vmCleanup;
|
||||||
|
mgmtFp.startFp = vmStart;
|
||||||
|
mgmtFp.stopFp = vmStop;
|
||||||
mgmtFp.requiredFp = vmRequire;
|
mgmtFp.requiredFp = vmRequire;
|
||||||
|
|
||||||
vmInitMsgHandle(pWrapper);
|
vmInitMsgHandle(pWrapper);
|
||||||
|
@ -396,4 +433,4 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
}
|
}
|
||||||
|
|
|
@ -476,33 +476,37 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
SQueryPlan* pPlan = NULL;
|
||||||
if (pPlan == NULL) {
|
SSubplan* plan = NULL;
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
return -1;
|
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
||||||
|
if (pPlan == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pSub->vgNum == -1);
|
||||||
|
|
||||||
|
pSub->vgNum = 0;
|
||||||
|
|
||||||
|
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||||
|
if (levelNum != 1) {
|
||||||
|
qDestroyQueryPlan(pPlan);
|
||||||
|
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
|
||||||
|
|
||||||
|
int32_t opNum = LIST_LENGTH(inner->pNodeList);
|
||||||
|
if (opNum != 1) {
|
||||||
|
qDestroyQueryPlan(pPlan);
|
||||||
|
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
plan = nodesListGetNode(inner->pNodeList, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pSub->vgNum == -1);
|
|
||||||
|
|
||||||
pSub->vgNum = 0;
|
|
||||||
|
|
||||||
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
|
||||||
if (levelNum != 1) {
|
|
||||||
qDestroyQueryPlan(pPlan);
|
|
||||||
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
|
|
||||||
|
|
||||||
int32_t opNum = LIST_LENGTH(inner->pNodeList);
|
|
||||||
if (opNum != 1) {
|
|
||||||
qDestroyQueryPlan(pPlan);
|
|
||||||
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
|
|
||||||
|
|
||||||
int64_t unexistKey = -1;
|
int64_t unexistKey = -1;
|
||||||
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
||||||
ASSERT(pEpInSub);
|
ASSERT(pEpInSub);
|
||||||
|
@ -519,38 +523,35 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
}
|
}
|
||||||
|
|
||||||
pSub->vgNum++;
|
pSub->vgNum++;
|
||||||
plan->execNode.nodeId = pVgroup->vgId;
|
|
||||||
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
|
||||||
|
|
||||||
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||||
pVgEp->epSet = plan->execNode.epSet;
|
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
pVgEp->vgId = plan->execNode.nodeId;
|
pVgEp->vgId = pVgroup->vgId;
|
||||||
|
taosArrayPush(pEpInSub->vgs, &pVgEp);
|
||||||
#if 0
|
|
||||||
SMqConsumerEp consumerEp = {0};
|
|
||||||
consumerEp.status = 0;
|
|
||||||
consumerEp.consumerId = -1;
|
|
||||||
consumerEp.epSet = plan->execNode.epSet;
|
|
||||||
consumerEp.vgId = plan->execNode.nodeId;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
||||||
|
|
||||||
int32_t msgLen;
|
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
|
int32_t msgLen;
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
qDestroyQueryPlan(pPlan);
|
plan->execNode.epSet = pVgEp->epSet;
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
plan->execNode.nodeId = pVgEp->vgId;
|
||||||
return -1;
|
|
||||||
|
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
qDestroyQueryPlan(pPlan);
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pVgEp->qmsg = strdup("");
|
||||||
}
|
}
|
||||||
taosArrayPush(pEpInSub->vgs, &pVgEp);
|
|
||||||
|
|
||||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
||||||
|
|
||||||
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
|
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pEpInSub->vgs->size > 0);
|
|
||||||
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
||||||
|
|
||||||
ASSERT(pEpInSub->vgs->size > 0);
|
ASSERT(pEpInSub->vgs->size > 0);
|
||||||
|
|
|
@ -282,10 +282,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
|
||||||
topicObj.version = 1;
|
topicObj.version = 1;
|
||||||
topicObj.sql = strdup(pCreate->sql);
|
topicObj.sql = strdup(pCreate->sql);
|
||||||
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
||||||
topicObj.ast = strdup(pCreate->ast);
|
|
||||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
|
||||||
|
|
||||||
if (pCreate->ast && pCreate->ast[0]) {
|
if (pCreate->ast && pCreate->ast[0]) {
|
||||||
|
topicObj.ast = strdup(pCreate->ast);
|
||||||
|
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||||
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
|
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
|
||||||
topicObj.withTbName = 0;
|
topicObj.withTbName = 0;
|
||||||
topicObj.withSchema = 0;
|
topicObj.withSchema = 0;
|
||||||
|
@ -314,6 +314,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
topicObj.ast = strdup("");
|
||||||
|
topicObj.astLen = 1;
|
||||||
|
topicObj.physicalPlan = strdup("");
|
||||||
topicObj.subType = TOPIC_SUB_TYPE__DB;
|
topicObj.subType = TOPIC_SUB_TYPE__DB;
|
||||||
topicObj.withTbName = 1;
|
topicObj.withTbName = 1;
|
||||||
topicObj.withSchema = 1;
|
topicObj.withSchema = 1;
|
||||||
|
|
|
@ -61,6 +61,9 @@ int32_t vnodeSync(SVnode *pVnode);
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
||||||
|
|
||||||
|
int32_t vnodeStart(SVnode *pVnode);
|
||||||
|
void vnodeStop(SVnode *pVnode);
|
||||||
|
|
||||||
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
||||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
||||||
|
|
||||||
|
@ -171,11 +174,6 @@ typedef struct {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
} STableKeyInfo;
|
} STableKeyInfo;
|
||||||
|
|
||||||
// sync integration
|
|
||||||
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle);
|
|
||||||
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle);
|
|
||||||
int32_t vnodeSyncStart(SVnode *pVnode);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -410,8 +410,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
SMqDataBlkRsp rsp = {0};
|
SMqDataBlkRsp rsp = {0};
|
||||||
rsp.reqOffset = pReq->currentOffset;
|
rsp.reqOffset = pReq->currentOffset;
|
||||||
|
rsp.withSchema = pExec->withSchema;
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
consumerEpoch = atomic_load_32(&pExec->epoch);
|
consumerEpoch = atomic_load_32(&pExec->epoch);
|
||||||
|
@ -511,6 +513,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
ASSERT(actualLen <= dataStrLen);
|
ASSERT(actualLen <= dataStrLen);
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
taosArrayPush(rsp.blockDataLen, &actualLen);
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
taosArrayPush(rsp.blockData, &buf);
|
||||||
|
|
||||||
|
if (pExec->withSchema) {
|
||||||
|
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
|
||||||
|
taosArrayPush(rsp.blockSchema, &pSW);
|
||||||
|
}
|
||||||
|
|
||||||
rsp.blockNum++;
|
rsp.blockNum++;
|
||||||
}
|
}
|
||||||
// db subscribe
|
// db subscribe
|
||||||
|
@ -539,6 +547,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
ASSERT(actualLen <= dataStrLen);
|
ASSERT(actualLen <= dataStrLen);
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
taosArrayPush(rsp.blockDataLen, &actualLen);
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
taosArrayPush(rsp.blockData, &buf);
|
||||||
|
|
||||||
|
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
|
||||||
|
taosArrayPush(rsp.blockSchema, &pSW);
|
||||||
|
|
||||||
rsp.blockNum++;
|
rsp.blockNum++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -585,6 +597,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// TODO destroy
|
// TODO destroy
|
||||||
taosArrayDestroy(rsp.blockData);
|
taosArrayDestroy(rsp.blockData);
|
||||||
taosArrayDestroy(rsp.blockDataLen);
|
taosArrayDestroy(rsp.blockDataLen);
|
||||||
|
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -826,12 +840,16 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
SReadHandle handle = {
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
.reader = pExec->pExecReader[i],
|
SReadHandle handle = {
|
||||||
.meta = pTq->pVnode->pMeta,
|
.reader = pExec->pExecReader[i],
|
||||||
};
|
.meta = pTq->pVnode->pMeta,
|
||||||
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
|
};
|
||||||
ASSERT(pExec->task[i]);
|
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
|
||||||
|
ASSERT(pExec->task[i]);
|
||||||
|
} else {
|
||||||
|
pExec->task[i] = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -65,7 +65,9 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
|
|
||||||
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
|
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
|
||||||
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
|
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
|
||||||
ASSERT(pHandle->tbIdHash);
|
if (pHandle->tbIdHash == NULL) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
|
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
|
||||||
if (ret != NULL) {
|
if (ret != NULL) {
|
||||||
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
|
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
|
||||||
|
@ -109,26 +111,15 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
*pNumOfRows = pHandle->pBlock->numOfRows;
|
*pNumOfRows = pHandle->pBlock->numOfRows;
|
||||||
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
||||||
|
|
||||||
if (colNumNeed > pSchemaWrapper->nCols) {
|
if (colNumNeed == 0) {
|
||||||
colNumNeed = pSchemaWrapper->nCols;
|
*ppCols = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
||||||
}
|
if (*ppCols == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
*ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
|
int32_t colMeta = 0;
|
||||||
if (*ppCols == NULL) {
|
while (colMeta < pSchemaWrapper->nCols) {
|
||||||
return -1;
|
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
|
||||||
}
|
|
||||||
|
|
||||||
int32_t colMeta = 0;
|
|
||||||
int32_t colNeed = 0;
|
|
||||||
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
|
|
||||||
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
|
|
||||||
col_id_t colIdSchema = pColSchema->colId;
|
|
||||||
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
|
|
||||||
if (colIdSchema < colIdNeed) {
|
|
||||||
colMeta++;
|
|
||||||
} else if (colIdSchema > colIdNeed) {
|
|
||||||
colNeed++;
|
|
||||||
} else {
|
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
colInfo.info.bytes = pColSchema->bytes;
|
colInfo.info.bytes = pColSchema->bytes;
|
||||||
colInfo.info.colId = pColSchema->colId;
|
colInfo.info.colId = pColSchema->colId;
|
||||||
|
@ -139,7 +130,40 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
}
|
}
|
||||||
taosArrayPush(*ppCols, &colInfo);
|
taosArrayPush(*ppCols, &colInfo);
|
||||||
colMeta++;
|
colMeta++;
|
||||||
colNeed++;
|
}
|
||||||
|
} else {
|
||||||
|
if (colNumNeed > pSchemaWrapper->nCols) {
|
||||||
|
colNumNeed = pSchemaWrapper->nCols;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
|
||||||
|
if (*ppCols == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t colMeta = 0;
|
||||||
|
int32_t colNeed = 0;
|
||||||
|
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
|
||||||
|
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
|
||||||
|
col_id_t colIdSchema = pColSchema->colId;
|
||||||
|
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
|
||||||
|
if (colIdSchema < colIdNeed) {
|
||||||
|
colMeta++;
|
||||||
|
} else if (colIdSchema > colIdNeed) {
|
||||||
|
colNeed++;
|
||||||
|
} else {
|
||||||
|
SColumnInfoData colInfo = {0};
|
||||||
|
colInfo.info.bytes = pColSchema->bytes;
|
||||||
|
colInfo.info.colId = pColSchema->colId;
|
||||||
|
colInfo.info.type = pColSchema->type;
|
||||||
|
|
||||||
|
if (colInfoDataEnsureCapacity(&colInfo, 0, *pNumOfRows) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
taosArrayPush(*ppCols, &colInfo);
|
||||||
|
colMeta++;
|
||||||
|
colNeed++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
#include "vnodeSync.h"
|
||||||
|
|
||||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
|
@ -171,6 +172,16 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start the sync timer after the queue is ready
|
||||||
|
int32_t vnodeStart(SVnode *pVnode) {
|
||||||
|
vnodeSyncSetQ(pVnode, NULL);
|
||||||
|
vnodeSyncSetRpc(pVnode, NULL);
|
||||||
|
vnodeSyncStart(pVnode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeStop(SVnode *pVnode) {}
|
||||||
|
|
||||||
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
||||||
|
|
||||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
|
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
|
|
@ -268,7 +268,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
||||||
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
|
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
|
||||||
|
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
pBlock->info.numOfCols = numOfCols;
|
|
||||||
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
pBlock->info.blockId = pNode->dataBlockId;
|
pBlock->info.blockId = pNode->dataBlockId;
|
||||||
|
@ -294,6 +293,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
||||||
taosArrayPush(pBlock->pDataBlock, &idata);
|
taosArrayPush(pBlock->pDataBlock, &idata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pBlock->info.numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1032,6 +1032,8 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
||||||
pColInfo->info.bytes = tDataTypes[type].bytes;
|
pColInfo->info.bytes = tDataTypes[type].bytes;
|
||||||
|
|
||||||
pInput->pData[paramIndex] = pColInfo;
|
pInput->pData[paramIndex] = pColInfo;
|
||||||
|
} else {
|
||||||
|
pColInfo = pInput->pData[paramIndex];
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(!IS_VAR_DATA_TYPE(type));
|
ASSERT(!IS_VAR_DATA_TYPE(type));
|
||||||
|
|
|
@ -355,7 +355,7 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
||||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType) { .bytes = 24, .type = TSDB_DATA_TYPE_BINARY};
|
pFunc->node.resType = (SDataType) { .bytes = 64, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -489,7 +489,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
||||||
if (fmIsUserDefinedFunc(node->funcId)) {
|
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
FOREACH(tnode, node->pParameterList) {
|
FOREACH(tnode, node->pParameterList) {
|
||||||
if (!SCL_IS_CONST_NODE(tnode)) {
|
if (!SCL_IS_CONST_NODE(tnode)) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
|
@ -517,8 +517,8 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
||||||
res->node.resType = node->node.resType;
|
res->node.resType = node->node.resType;
|
||||||
int32_t type = output.columnData->info.type;
|
int32_t type = output.columnData->info.type;
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
res->datum.p = output.columnData->pData;
|
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
||||||
output.columnData->pData = NULL;
|
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
||||||
} else {
|
} else {
|
||||||
memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
|
memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
|
||||||
}
|
}
|
||||||
|
|
|
@ -403,6 +403,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum"
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
||||||
|
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
|
||||||
|
|
|
@ -395,7 +395,7 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) {
|
||||||
void shellShowOnScreen(SShellCmd *cmd) {
|
void shellShowOnScreen(SShellCmd *cmd) {
|
||||||
struct winsize w;
|
struct winsize w;
|
||||||
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
|
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
|
||||||
fprintf(stderr, "No stream device\n");
|
// fprintf(stderr, "No stream device\n");
|
||||||
w.ws_col = 120;
|
w.ws_col = 120;
|
||||||
w.ws_row = 30;
|
w.ws_row = 30;
|
||||||
}
|
}
|
||||||
|
|
|
@ -750,7 +750,7 @@ void shellReadHistory() {
|
||||||
|
|
||||||
void shellWriteHistory() {
|
void shellWriteHistory() {
|
||||||
SShellHistory *pHistory = &shell.history;
|
SShellHistory *pHistory = &shell.history;
|
||||||
TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_WRITE | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_STREAM | TD_FILE_APPEND);
|
||||||
if (pFile == NULL) return;
|
if (pFile == NULL) return;
|
||||||
|
|
||||||
for (int32_t i = pHistory->hstart; i != pHistory->hend;) {
|
for (int32_t i = pHistory->hstart; i != pHistory->hend;) {
|
||||||
|
|
Loading…
Reference in New Issue