Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
bb9e6b9b0b
|
@ -1167,7 +1167,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char outputTbName[TSDB_TABLE_NAME_LEN];
|
char outputSTbName[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
|
@ -1975,7 +1975,7 @@ typedef struct {
|
||||||
int32_t tagsFilterLen; // strlen + 1
|
int32_t tagsFilterLen; // strlen + 1
|
||||||
int32_t sqlLen; // strlen + 1
|
int32_t sqlLen; // strlen + 1
|
||||||
int32_t astLen; // strlen + 1
|
int32_t astLen; // strlen + 1
|
||||||
char* expr;
|
char* expr;
|
||||||
char* tagsFilter;
|
char* tagsFilter;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
|
@ -1997,9 +1997,9 @@ typedef struct {
|
||||||
int8_t version; // for compatibility(default 0)
|
int8_t version; // for compatibility(default 0)
|
||||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||||
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
||||||
int8_t timezoneInt; // sma data expired if timezone changes.
|
int8_t timezoneInt; // sma data expired if timezone changes.
|
||||||
char indexName[TSDB_INDEX_NAME_LEN];
|
char indexName[TSDB_INDEX_NAME_LEN];
|
||||||
char timezone[TD_TIMEZONE_LEN];
|
char timezone[TD_TIMEZONE_LEN];
|
||||||
int32_t exprLen;
|
int32_t exprLen;
|
||||||
int32_t tagsFilterLen;
|
int32_t tagsFilterLen;
|
||||||
int64_t indexUid;
|
int64_t indexUid;
|
||||||
|
@ -2371,6 +2371,26 @@ enum {
|
||||||
STREAM_NEXT_OP_DST__SND,
|
STREAM_NEXT_OP_DST__SND,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
STREAM_SOURCE_TYPE__NONE = 1,
|
||||||
|
STREAM_SOURCE_TYPE__SUPER,
|
||||||
|
STREAM_SOURCE_TYPE__CHILD,
|
||||||
|
STREAM_SOURCE_TYPE__NORMAL,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
STREAM_SINK_TYPE__NONE = 1,
|
||||||
|
STREAM_SINK_TYPE__INPLACE,
|
||||||
|
STREAM_SINK_TYPE__ASSIGNED,
|
||||||
|
STREAM_SINK_TYPE__MULTIPLE,
|
||||||
|
STREAM_SINK_TYPE__TEMPORARY,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
STREAM_TYPE__NORMAL = 1,
|
||||||
|
STREAM_TYPE__SMA,
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void* inputHandle;
|
void* inputHandle;
|
||||||
void* executor;
|
void* executor;
|
||||||
|
@ -2381,28 +2401,33 @@ typedef struct {
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
int32_t level;
|
int32_t level;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int8_t pipeSource;
|
|
||||||
int8_t pipeSink;
|
|
||||||
int8_t numOfRunners;
|
|
||||||
int8_t parallelizable;
|
int8_t parallelizable;
|
||||||
int8_t nextOpDst; // vnode or snode
|
|
||||||
|
// vnode or snode
|
||||||
|
int8_t nextOpDst;
|
||||||
|
|
||||||
|
int8_t sourceType;
|
||||||
|
int8_t sinkType;
|
||||||
|
|
||||||
|
// for sink type assigned
|
||||||
|
int32_t sinkVgId;
|
||||||
SEpSet NextOpEp;
|
SEpSet NextOpEp;
|
||||||
char* qmsg;
|
|
||||||
// not applied to encoder and decoder
|
// executor meta info
|
||||||
|
char* qmsg;
|
||||||
|
|
||||||
|
// followings are not applied to encoder and decoder
|
||||||
|
int8_t numOfRunners;
|
||||||
SStreamRunner runner[8];
|
SStreamRunner runner[8];
|
||||||
// void* executor;
|
|
||||||
// void* stateStore;
|
|
||||||
// storage handle
|
|
||||||
} SStreamTask;
|
} SStreamTask;
|
||||||
|
|
||||||
static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) {
|
static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId) {
|
||||||
SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask));
|
SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pTask->taskId = tGenIdPI32();
|
pTask->taskId = tGenIdPI32();
|
||||||
pTask->streamId = streamId;
|
pTask->streamId = streamId;
|
||||||
pTask->level = level;
|
|
||||||
pTask->status = STREAM_TASK_STATUS__RUNNING;
|
pTask->status = STREAM_TASK_STATUS__RUNNING;
|
||||||
pTask->qmsg = NULL;
|
pTask->qmsg = NULL;
|
||||||
return pTask;
|
return pTask;
|
||||||
|
@ -2435,7 +2460,7 @@ typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
SArray* res; // SArray<SSDataBlock>
|
SArray* res; // SArray<SSDataBlock>
|
||||||
} SStreamSmaSinkReq;
|
} SStreamSinkReq;
|
||||||
|
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,9 @@
|
||||||
#define _TD_MND_H_
|
#define _TD_MND_H_
|
||||||
|
|
||||||
#include "monitor.h"
|
#include "monitor.h"
|
||||||
|
#include "tmsg.h"
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
|
|
@ -505,7 +505,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
|
||||||
.sql = (char*)sql,
|
.sql = (char*)sql,
|
||||||
};
|
};
|
||||||
tNameExtractFullName(&name, req.name);
|
tNameExtractFullName(&name, req.name);
|
||||||
strcpy(req.outputTbName, tbName);
|
strcpy(req.outputSTbName, tbName);
|
||||||
|
|
||||||
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
|
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
|
||||||
void* buf = malloc(tlen);
|
void* buf = malloc(tlen);
|
||||||
|
|
|
@ -314,7 +314,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
|
||||||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
|
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
|
||||||
}
|
}
|
||||||
if(pReq->rollup && pReq->stbCfg.pRSmaParam) {
|
if (pReq->rollup && pReq->stbCfg.pRSmaParam) {
|
||||||
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||||
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
||||||
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
||||||
|
@ -341,7 +341,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
|
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
|
||||||
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]);
|
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]);
|
||||||
}
|
}
|
||||||
if(pReq->rollup && pReq->ntbCfg.pRSmaParam) {
|
if (pReq->rollup && pReq->ntbCfg.pRSmaParam) {
|
||||||
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
||||||
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
||||||
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
||||||
|
@ -427,7 +427,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
|
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
|
||||||
}
|
}
|
||||||
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
|
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
|
||||||
if(pReq->ntbCfg.nBSmaCols > 0) {
|
if (pReq->ntbCfg.nBSmaCols > 0) {
|
||||||
pReq->ntbCfg.pBSmaCols = (col_id_t *)malloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t));
|
pReq->ntbCfg.pBSmaCols = (col_id_t *)malloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t));
|
||||||
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
|
for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
|
||||||
buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i);
|
buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i);
|
||||||
|
@ -435,10 +435,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
} else {
|
} else {
|
||||||
pReq->ntbCfg.pBSmaCols = NULL;
|
pReq->ntbCfg.pBSmaCols = NULL;
|
||||||
}
|
}
|
||||||
if(pReq->rollup) {
|
if (pReq->rollup) {
|
||||||
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
|
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
|
||||||
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
||||||
buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor);
|
buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor);
|
||||||
buf = taosDecodeFixedI8(buf, ¶m->delayUnit);
|
buf = taosDecodeFixedI8(buf, ¶m->delayUnit);
|
||||||
buf = taosDecodeFixedI8(buf, ¶m->nFuncIds);
|
buf = taosDecodeFixedI8(buf, ¶m->nFuncIds);
|
||||||
if (param->nFuncIds > 0) {
|
if (param->nFuncIds > 0) {
|
||||||
|
@ -3045,7 +3045,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->outputSTbName) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, astLen) < 0) return -1;
|
if (tEncodeI32(&encoder, astLen) < 0) return -1;
|
||||||
|
@ -3068,7 +3068,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->outputTbName) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->outputSTbName) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
||||||
|
@ -3101,12 +3101,14 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
|
||||||
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->pipeSource) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1;
|
||||||
// if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
|
||||||
|
if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) {
|
||||||
|
if (tEncodeI32(pEncoder, pTask->sinkVgId) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
|
||||||
|
}
|
||||||
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
|
||||||
/*tEndEncode(pEncoder);*/
|
/*tEndEncode(pEncoder);*/
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
|
@ -3118,12 +3120,14 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
|
||||||
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->pipeSource) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1;
|
||||||
// if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
|
||||||
|
if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) {
|
||||||
|
if (tDecodeI32(pDecoder, &pTask->sinkVgId) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
|
||||||
|
}
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
|
||||||
/*tEndDecode(pDecoder);*/
|
/*tEndDecode(pDecoder);*/
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -720,6 +720,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
char outputSTbName[TSDB_TABLE_FNAME_LEN];
|
||||||
int64_t createTime;
|
int64_t createTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
|
@ -729,11 +730,12 @@ typedef struct {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
// int32_t sqlLen;
|
// int32_t sqlLen;
|
||||||
|
int32_t sinkVgId; // 0 for automatic
|
||||||
char* sql;
|
char* sql;
|
||||||
char* logicalPlan;
|
char* logicalPlan;
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
SArray* tasks; // SArray<SArray<SStreamTask>>
|
SArray* tasks; // SArray<SArray<SStreamTask>>
|
||||||
SArray* outputName;
|
SArray* ColAlias;
|
||||||
} SStreamObj;
|
} SStreamObj;
|
||||||
|
|
||||||
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
|
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "mndDef.h"
|
#include "mndDef.h"
|
||||||
|
|
||||||
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
|
int32_t sz = 0;
|
||||||
int32_t outputNameSz = 0;
|
int32_t outputNameSz = 0;
|
||||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
|
||||||
|
@ -30,27 +31,26 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||||
// TODO encode tasks
|
// TODO encode tasks
|
||||||
if (pObj->tasks) {
|
if (pObj->tasks) {
|
||||||
int32_t sz = taosArrayGetSize(pObj->tasks);
|
sz = taosArrayGetSize(pObj->tasks);
|
||||||
tEncodeI32(pEncoder, sz);
|
}
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||||
SArray *pArray = taosArrayGet(pObj->tasks, i);
|
|
||||||
int32_t innerSz = taosArrayGetSize(pArray);
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
tEncodeI32(pEncoder, innerSz);
|
SArray *pArray = taosArrayGet(pObj->tasks, i);
|
||||||
for (int32_t j = 0; j < innerSz; j++) {
|
int32_t innerSz = taosArrayGetSize(pArray);
|
||||||
SStreamTask *pTask = taosArrayGet(pArray, j);
|
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
|
||||||
tEncodeSStreamTask(pEncoder, pTask);
|
for (int32_t j = 0; j < innerSz; j++) {
|
||||||
}
|
SStreamTask *pTask = taosArrayGet(pArray, j);
|
||||||
|
if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
tEncodeI32(pEncoder, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pObj->outputName != NULL) {
|
if (pObj->ColAlias != NULL) {
|
||||||
outputNameSz = taosArrayGetSize(pObj->outputName);
|
outputNameSz = taosArrayGetSize(pObj->ColAlias);
|
||||||
}
|
}
|
||||||
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
|
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
|
||||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
for (int32_t i = 0; i < outputNameSz; i++) {
|
||||||
char *name = taosArrayGetP(pObj->outputName, i);
|
char *name = taosArrayGetP(pObj->ColAlias, i);
|
||||||
if (tEncodeCStr(pEncoder, name) < 0) return -1;
|
if (tEncodeCStr(pEncoder, name) < 0) return -1;
|
||||||
}
|
}
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
|
@ -68,6 +68,7 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||||
|
pObj->tasks = NULL;
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||||
if (sz != 0) {
|
if (sz != 0) {
|
||||||
|
@ -83,19 +84,19 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
}
|
}
|
||||||
taosArrayPush(pObj->tasks, pArray);
|
taosArrayPush(pObj->tasks, pArray);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
pObj->tasks = NULL;
|
|
||||||
}
|
}
|
||||||
int32_t outputNameSz;
|
int32_t outputNameSz;
|
||||||
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
|
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
|
||||||
pObj->outputName = taosArrayInit(outputNameSz, sizeof(void *));
|
if (outputNameSz != 0) {
|
||||||
if (pObj->outputName == NULL) {
|
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
|
||||||
return -1;
|
if (pObj->ColAlias == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
for (int32_t i = 0; i < outputNameSz; i++) {
|
||||||
char *name;
|
char *name;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
|
||||||
taosArrayPush(pObj->outputName, &name);
|
taosArrayPush(pObj->ColAlias, &name);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
|
||||||
action.contLen = tlen;
|
action.contLen = tlen;
|
||||||
action.msgType = type;
|
action.msgType = type;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
rpcFreeCont(buf);
|
free(buf);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -131,13 +131,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
|
|
||||||
lastUsedVgId = pVgroup->vgId;
|
lastUsedVgId = pVgroup->vgId;
|
||||||
pStream->vgNum++;
|
pStream->vgNum++;
|
||||||
// send to vnode
|
|
||||||
|
|
||||||
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
|
SStreamTask* pTask = streamTaskNew(pStream->uid);
|
||||||
pTask->pipeSource = 1;
|
pTask->level = level;
|
||||||
pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
|
pTask->sourceType = 1;
|
||||||
|
pTask->sinkType = level == totLevel - 1 ? 1 : 0;
|
||||||
pTask->parallelizable = 1;
|
pTask->parallelizable = 1;
|
||||||
// TODO: set to
|
|
||||||
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
|
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
@ -146,13 +145,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
taosArrayPush(taskOneLevel, pTask);
|
taosArrayPush(taskOneLevel, pTask);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
|
SStreamTask* pTask = streamTaskNew(pStream->uid);
|
||||||
pTask->pipeSource = 0;
|
pTask->level = level;
|
||||||
pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
|
pTask->sourceType = 0;
|
||||||
|
pTask->sinkType = level == totLevel - 1 ? 1 : 0;
|
||||||
pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
|
pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
|
||||||
pTask->nextOpDst = STREAM_NEXT_OP_DST__VND;
|
pTask->nextOpDst = STREAM_NEXT_OP_DST__VND;
|
||||||
|
|
||||||
if (tsStreamSchedV) {
|
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
|
||||||
|
if (pSnode == NULL || tsStreamSchedV) {
|
||||||
ASSERT(lastUsedVgId != 0);
|
ASSERT(lastUsedVgId != 0);
|
||||||
SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId);
|
SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId);
|
||||||
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) {
|
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) {
|
||||||
|
@ -162,24 +163,19 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
}
|
}
|
||||||
sdbRelease(pSdb, pVg);
|
sdbRelease(pSdb, pVg);
|
||||||
} else {
|
} else {
|
||||||
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
|
if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) {
|
||||||
if (pSnode != NULL) {
|
sdbRelease(pSdb, pSnode);
|
||||||
if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) {
|
qDestroyQueryPlan(pPlan);
|
||||||
sdbRelease(pSdb, pSnode);
|
return -1;
|
||||||
qDestroyQueryPlan(pPlan);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
sdbRelease(pMnode->pSdb, pSnode);
|
|
||||||
} else {
|
|
||||||
// TODO: assign to one vg
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sdbRelease(pMnode->pSdb, pSnode);
|
||||||
|
|
||||||
taosArrayPush(taskOneLevel, pTask);
|
taosArrayPush(taskOneLevel, pTask);
|
||||||
}
|
}
|
||||||
taosArrayPush(pStream->tasks, taskOneLevel);
|
taosArrayPush(pStream->tasks, taskOneLevel);
|
||||||
}
|
}
|
||||||
|
qDestroyQueryPlan(pPlan);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -271,6 +271,30 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pOld->numOfSmas < pNew->numOfSmas) {
|
||||||
|
void *pSmas = malloc(pNew->numOfSmas * sizeof(SSchema));
|
||||||
|
if (pSmas != NULL) {
|
||||||
|
free(pOld->pSmas);
|
||||||
|
pOld->pSmas = pSmas;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||||
|
taosWUnLockLatch(&pOld->lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOld->commentLen < pNew->commentLen) {
|
||||||
|
void *comment = malloc(pNew->commentLen);
|
||||||
|
if (comment != NULL) {
|
||||||
|
free(pOld->comment);
|
||||||
|
pOld->comment = comment;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||||
|
taosWUnLockLatch(&pOld->lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pOld->updateTime = pNew->updateTime;
|
pOld->updateTime = pNew->updateTime;
|
||||||
pOld->version = pNew->version;
|
pOld->version = pNew->version;
|
||||||
pOld->nextColId = pNew->nextColId;
|
pOld->nextColId = pNew->nextColId;
|
||||||
|
@ -278,7 +302,9 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
pOld->numOfTags = pNew->numOfTags;
|
pOld->numOfTags = pNew->numOfTags;
|
||||||
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
|
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
|
||||||
memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
|
memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
|
||||||
memcpy(pOld->comment, pNew->comment, TSDB_STB_COMMENT_LEN);
|
if (pNew->commentLen != 0) {
|
||||||
|
memcpy(pOld->comment, pNew->comment, TSDB_STB_COMMENT_LEN);
|
||||||
|
}
|
||||||
taosWUnLockLatch(&pOld->lock);
|
taosWUnLockLatch(&pOld->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -272,6 +272,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
||||||
if (nodesStringToNode(ast, &pAst) < 0) {
|
if (nodesStringToNode(ast, &pAst) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#if 1
|
||||||
SArray *names = mndExtractNamesFromAst(pAst);
|
SArray *names = mndExtractNamesFromAst(pAst);
|
||||||
printf("|");
|
printf("|");
|
||||||
for (int i = 0; i < taosArrayGetSize(names); i++) {
|
for (int i = 0; i < taosArrayGetSize(names); i++) {
|
||||||
|
@ -279,7 +280,8 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
||||||
}
|
}
|
||||||
printf("\n=======================================================\n");
|
printf("\n=======================================================\n");
|
||||||
|
|
||||||
pStream->outputName = names;
|
pStream->ColAlias = names;
|
||||||
|
#endif
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
|
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
|
||||||
mError("topic:%s, failed to get plan since %s", pStream->name, terrstr());
|
mError("topic:%s, failed to get plan since %s", pStream->name, terrstr());
|
||||||
|
@ -290,6 +292,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
||||||
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
|
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
mDebug("trans:%d, used to create stream:%s", pTrans->id, pStream->name);
|
||||||
|
|
||||||
SSdbRaw *pRedoRaw = mndStreamActionEncode(pStream);
|
SSdbRaw *pRedoRaw = mndStreamActionEncode(pStream);
|
||||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
|
@ -307,6 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
tstrncpy(streamObj.outputSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
streamObj.createTime = taosGetTimestampMs();
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
streamObj.updateTime = streamObj.createTime;
|
streamObj.updateTime = streamObj.createTime;
|
||||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
|
|
@ -358,9 +358,7 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndStart(SMnode *pMnode) {
|
int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); }
|
||||||
return mndInitTimer(pMnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndProcessMsg(SNodeMsg *pMsg) {
|
int32_t mndProcessMsg(SNodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pNode;
|
SMnode *pMnode = pMsg->pNode;
|
||||||
|
@ -415,11 +413,11 @@ int64_t mndGenerateUid(char *name, int32_t len) {
|
||||||
int32_t hashval = MurmurHash3_32(name, len);
|
int32_t hashval = MurmurHash3_32(name, len);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
int64_t us = taosGetTimestampUs();
|
int64_t us = taosGetTimestampUs();
|
||||||
int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
|
int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
|
||||||
int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
|
int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
|
||||||
if (uuid) {
|
if (uuid) {
|
||||||
return abs(uuid);
|
return llabs(uuid);
|
||||||
}
|
}
|
||||||
} while (true);
|
} while (true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,15 +161,16 @@ typedef struct {
|
||||||
struct STQ {
|
struct STQ {
|
||||||
// the collection of groups
|
// the collection of groups
|
||||||
// the handle of meta kvstore
|
// the handle of meta kvstore
|
||||||
|
bool writeTrigger;
|
||||||
char* path;
|
char* path;
|
||||||
STqCfg* tqConfig;
|
STqCfg* tqConfig;
|
||||||
STqMemRef tqMemRef;
|
STqMemRef tqMemRef;
|
||||||
STqMetaStore* tqMeta;
|
STqMetaStore* tqMeta;
|
||||||
STqPushMgr* tqPushMgr;
|
// STqPushMgr* tqPushMgr;
|
||||||
SHashObj* pStreamTasks;
|
SHashObj* pStreamTasks;
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
SMeta* pVnodeMeta;
|
SMeta* pVnodeMeta;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -55,6 +55,21 @@ typedef struct SVnodeMgr {
|
||||||
TD_DLIST(SVnodeTask) queue;
|
TD_DLIST(SVnodeTask) queue;
|
||||||
} SVnodeMgr;
|
} SVnodeMgr;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t streamType; // sma or other
|
||||||
|
int8_t dstType;
|
||||||
|
int16_t padding;
|
||||||
|
int32_t smaId;
|
||||||
|
int64_t tbUid;
|
||||||
|
int64_t lastReceivedVer;
|
||||||
|
int64_t lastCommittedVer;
|
||||||
|
} SStreamSinkInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SVnode* pVnode;
|
||||||
|
SHashObj* pHash; // streamId -> SStreamSinkInfo
|
||||||
|
} SSink;
|
||||||
|
|
||||||
extern SVnodeMgr vnodeMgr;
|
extern SVnodeMgr vnodeMgr;
|
||||||
|
|
||||||
// SVState
|
// SVState
|
||||||
|
@ -72,8 +87,9 @@ struct SVnode {
|
||||||
SVBufPool* pBufPool;
|
SVBufPool* pBufPool;
|
||||||
SMeta* pMeta;
|
SMeta* pMeta;
|
||||||
STsdb* pTsdb;
|
STsdb* pTsdb;
|
||||||
STQ* pTq;
|
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
|
STQ* pTq;
|
||||||
|
SSink* pSink;
|
||||||
tsem_t canCommit;
|
tsem_t canCommit;
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
|
|
|
@ -52,12 +52,14 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
pTq->tqPushMgr = tqPushMgrOpen();
|
pTq->tqPushMgr = tqPushMgrOpen();
|
||||||
if (pTq->tqPushMgr == NULL) {
|
if (pTq->tqPushMgr == NULL) {
|
||||||
// free store
|
// free store
|
||||||
free(pTq);
|
free(pTq);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
@ -559,7 +561,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
|
||||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SStreamTask* pTask = (SStreamTask*)pIter;
|
SStreamTask* pTask = (SStreamTask*)pIter;
|
||||||
if (!pTask->pipeSource) continue;
|
if (!pTask->sourceType) continue;
|
||||||
|
|
||||||
int32_t workerId = 0;
|
int32_t workerId = 0;
|
||||||
void* exec = pTask->runner[workerId].executor;
|
void* exec = pTask->runner[workerId].executor;
|
||||||
|
@ -576,7 +578,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
|
||||||
}
|
}
|
||||||
taosArrayPush(pRes, output);
|
taosArrayPush(pRes, output);
|
||||||
}
|
}
|
||||||
if (pTask->pipeSink) {
|
if (pTask->sinkType) {
|
||||||
// write back
|
// write back
|
||||||
/*printf("reach end\n");*/
|
/*printf("reach end\n");*/
|
||||||
tqDebugShowSSData(pRes);
|
tqDebugShowSSData(pRes);
|
||||||
|
|
Loading…
Reference in New Issue