Merge branch '3.0' into feature/TD-13066-3.0
This commit is contained in:
commit
2a98a719e3
|
@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) {
|
|||
int32_t numOfFields = taos_field_count(msg);
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("%s\n", buf);
|
||||
|
||||
const char* tbName = tmq_get_table_name(msg);
|
||||
if (tbName) {
|
||||
printf("from tb: %s\n", tbName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,8 +106,8 @@ int32_t create_topic() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
|
||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
|
||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -161,6 +166,7 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
assert(tmq);
|
||||
|
|
|
@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
|
|||
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
// TODO
|
||||
#if 0
|
||||
DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
|
||||
#endif
|
||||
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
|
||||
|
||||
#if 0
|
||||
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
|
||||
|
|
|
@ -252,6 +252,7 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
|||
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
|
||||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
int8_t hashMeta;
|
||||
int64_t uid;
|
||||
char* tblFName;
|
||||
|
@ -271,7 +272,7 @@ typedef struct {
|
|||
|
||||
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
|
||||
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
|
||||
void tFreeSSubmitRsp(SSubmitRsp *pRsp);
|
||||
void tFreeSSubmitRsp(SSubmitRsp* pRsp);
|
||||
|
||||
#define COL_SMA_ON ((int8_t)0x1)
|
||||
#define COL_IDX_ON ((int8_t)0x2)
|
||||
|
@ -2380,6 +2381,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
SMsgHead head;
|
||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
int8_t withTbName;
|
||||
int32_t epoch;
|
||||
uint64_t reqId;
|
||||
int64_t consumerId;
|
||||
|
@ -2489,6 +2491,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
|
|||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
|
||||
tlen += taosEncodeSSchemaWrapper(buf, pSW);
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i);
|
||||
tlen += taosEncodeString(buf, tbName);
|
||||
}
|
||||
}
|
||||
}
|
||||
return tlen;
|
||||
|
@ -2501,6 +2507,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
|||
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
||||
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
if (pRsp->blockNum != 0) {
|
||||
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
||||
|
@ -2519,6 +2526,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
|||
buf = taosDecodeSSchemaWrapper(buf, pSW);
|
||||
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
char* name = NULL;
|
||||
buf = taosDecodeString(buf, &name);
|
||||
taosArrayPush(pRsp->blockTbName, &name);
|
||||
}
|
||||
}
|
||||
}
|
||||
return (void*)buf;
|
||||
|
|
|
@ -217,6 +217,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL)
|
||||
|
|
|
@ -39,15 +39,6 @@ extern "C" {
|
|||
//======================================================================================
|
||||
//begin API to taosd and qworker
|
||||
|
||||
enum {
|
||||
UDFC_CODE_STOPPING = -1,
|
||||
UDFC_CODE_PIPE_READ_ERR = -2,
|
||||
UDFC_CODE_CONNECT_PIPE_ERR = -3,
|
||||
UDFC_CODE_LOAD_UDF_FAILURE = -4,
|
||||
UDFC_CODE_INVALID_STATE = -5,
|
||||
UDFC_CODE_NO_PIPE = -6,
|
||||
};
|
||||
|
||||
typedef void *UdfcFuncHandle;
|
||||
|
||||
/**
|
||||
|
@ -89,6 +80,7 @@ typedef struct SUdfColumnData {
|
|||
|
||||
typedef struct SUdfColumn {
|
||||
SUdfColumnMeta colMeta;
|
||||
bool hasNull;
|
||||
SUdfColumnData colData;
|
||||
} SUdfColumn;
|
||||
|
||||
|
@ -232,6 +224,7 @@ static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
|
|||
} else {
|
||||
udfColDataSetNull_f(pColumn, row);
|
||||
}
|
||||
pColumn->hasNull = true;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
|
||||
|
|
|
@ -645,7 +645,15 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801)
|
||||
#define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802)
|
||||
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
|
||||
#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2604)
|
||||
#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
|
||||
|
||||
//udf
|
||||
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)
|
||||
#define TSDB_CODE_UDF_PIPE_READ_ERR TAOS_DEF_ERROR_CODE(0, 0x2902)
|
||||
#define TSDB_CODE_UDF_PIPE_CONNECT_ERR TAOS_DEF_ERROR_CODE(0, 0x2903)
|
||||
#define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904)
|
||||
#define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905)
|
||||
#define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906)
|
||||
|
||||
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)
|
||||
#define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001)
|
||||
|
|
|
@ -59,6 +59,21 @@ static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) {
|
|||
|
||||
static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return POINTER_SHIFT(buf, len); }
|
||||
|
||||
// --- Bool
|
||||
|
||||
static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) {
|
||||
if (buf != NULL) {
|
||||
((int8_t *)(*buf))[0] = value ? 1 : 0;
|
||||
*buf = POINTER_SHIFT(*buf, sizeof(int8_t));
|
||||
}
|
||||
return (int32_t)sizeof(int8_t);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) {
|
||||
*value = ((int8_t *)buf)[0] == 0 ? false : true;
|
||||
return POINTER_SHIFT(buf, sizeof(int8_t));
|
||||
}
|
||||
|
||||
// ---- Fixed U16
|
||||
static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) {
|
||||
if (buf != NULL) {
|
||||
|
|
|
@ -57,16 +57,17 @@ struct tmq_topic_vgroup_list_t {
|
|||
};
|
||||
|
||||
struct tmq_conf_t {
|
||||
char clientId[256];
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
int8_t autoCommit;
|
||||
int8_t resetOffset;
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
char* user;
|
||||
char* pass;
|
||||
char* db;
|
||||
char clientId[256];
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
int8_t autoCommit;
|
||||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
char* user;
|
||||
char* pass;
|
||||
/*char* db;*/
|
||||
tmq_commit_cb* commitCb;
|
||||
void* commitCbUserParam;
|
||||
};
|
||||
|
@ -75,6 +76,7 @@ struct tmq_t {
|
|||
// conf
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
char clientId[256];
|
||||
int8_t withTbName;
|
||||
int8_t autoCommit;
|
||||
int32_t autoCommitInterval;
|
||||
int32_t resetOffsetCfg;
|
||||
|
@ -187,6 +189,7 @@ typedef struct {
|
|||
|
||||
tmq_conf_t* tmq_conf_new() {
|
||||
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
|
||||
conf->withTbName = -1;
|
||||
conf->autoCommit = true;
|
||||
conf->autoCommitInterval = 5000;
|
||||
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
|
||||
|
@ -240,6 +243,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
}
|
||||
}
|
||||
|
||||
if (strcmp(key, "msg.with.table.name") == 0) {
|
||||
if (strcmp(value, "true") == 0) {
|
||||
conf->withTbName = 1;
|
||||
} else if (strcmp(value, "false") == 0) {
|
||||
conf->withTbName = 0;
|
||||
} else if (strcmp(value, "none") == 0) {
|
||||
conf->withTbName = -1;
|
||||
} else {
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
if (strcmp(key, "td.connect.ip") == 0) {
|
||||
conf->ip = strdup(value);
|
||||
return TMQ_CONF_OK;
|
||||
|
@ -257,7 +272,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
return TMQ_CONF_OK;
|
||||
}
|
||||
if (strcmp(key, "td.connect.db") == 0) {
|
||||
conf->db = strdup(value);
|
||||
/*conf->db = strdup(value);*/
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
|
@ -485,6 +500,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
// set conf
|
||||
strcpy(pTmq->clientId, conf->clientId);
|
||||
strcpy(pTmq->groupId, conf->groupId);
|
||||
pTmq->withTbName = conf->withTbName;
|
||||
pTmq->autoCommit = conf->autoCommit;
|
||||
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
||||
pTmq->commitCb = conf->commitCb;
|
||||
|
@ -1104,6 +1120,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
|
|||
pReq->subKey[tlen] = TMQ_SEPARATOR;
|
||||
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
|
||||
|
||||
pReq->withTbName = tmq->withTbName;
|
||||
pReq->waitTime = waitTime;
|
||||
pReq->consumerId = tmq->consumerId;
|
||||
pReq->epoch = tmq->epoch;
|
||||
|
@ -1346,3 +1363,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
const char* tmq_get_table_name(TAOS_RES* res) {
|
||||
if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)res;
|
||||
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
|
||||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
|
||||
return NULL;
|
||||
}
|
||||
const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
|
||||
return name;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -1311,6 +1311,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
|||
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
|
||||
tlen += taosEncodeFixedI16(buf, pColData->info.type);
|
||||
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
|
||||
tlen += taosEncodeFixedBool(buf, pColData->hasNull);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
|
||||
tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
|
||||
|
@ -1340,6 +1341,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
|
|||
buf = taosDecodeFixedI16(buf, &data.info.colId);
|
||||
buf = taosDecodeFixedI16(buf, &data.info.type);
|
||||
buf = taosDecodeFixedI32(buf, &data.info.bytes);
|
||||
buf = taosDecodeFixedBool(buf, &data.hasNull);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(data.info.type)) {
|
||||
buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
|
||||
|
|
|
@ -4032,6 +4032,7 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
|
|||
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI32(pEncoder, pBlock->code) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1;
|
||||
if (pBlock->hashMeta) {
|
||||
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
|
||||
|
@ -4047,10 +4048,11 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
|
|||
static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1;
|
||||
if (pBlock->hashMeta) {
|
||||
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
|
||||
pBlock->tblFName= taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
|
||||
pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
|
||||
if (NULL == pBlock->tblFName) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
|
||||
}
|
||||
|
@ -4089,7 +4091,7 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
|
|||
if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
tEndDecode(pDecoder);
|
||||
tDecoderClear(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -4108,4 +4110,3 @@ void tFreeSSubmitRsp(SSubmitRsp *pRsp) {
|
|||
|
||||
taosMemoryFree(pRsp);
|
||||
}
|
||||
|
||||
|
|
|
@ -318,6 +318,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
|||
pOld->updateTime = pNew->updateTime;
|
||||
pOld->version = pNew->version;
|
||||
pOld->nextColId = pNew->nextColId;
|
||||
pOld->ttl = pNew->ttl;
|
||||
pOld->numOfColumns = pNew->numOfColumns;
|
||||
pOld->numOfTags = pNew->numOfTags;
|
||||
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
|
||||
|
@ -832,7 +833,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) {
|
|||
}
|
||||
|
||||
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
|
||||
if (pAlter->commentLen != 0) return 0;
|
||||
if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0;
|
||||
|
||||
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
|
@ -883,7 +884,8 @@ static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen) {
|
||||
static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
|
||||
int32_t ttl) {
|
||||
if (commentLen > 0) {
|
||||
pNew->commentLen = commentLen;
|
||||
pNew->comment = taosMemoryCalloc(1, commentLen);
|
||||
|
@ -893,6 +895,9 @@ static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pCo
|
|||
}
|
||||
memcpy(pNew->comment, pComment, commentLen);
|
||||
}
|
||||
if (ttl >= 0) {
|
||||
pNew->ttl = ttl;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
|
@ -1232,7 +1237,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq *
|
|||
code = mndAlterStbColumnBytes(pOld, &stbObj, pField0);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
|
||||
code = mndUpdateStbComment(pOld, &stbObj, pAlter->comment, pAlter->commentLen);
|
||||
code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
|
@ -1723,7 +1728,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
|
||||
|
||||
char *p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures
|
||||
char *p = taosMemoryCalloc(1, pStb->commentLen + 1 + VARSTR_HEADER_SIZE); // check malloc failures
|
||||
if (p != NULL) {
|
||||
if (pStb->commentLen != 0) {
|
||||
STR_TO_VARSTR(p, pStb->comment);
|
||||
|
|
|
@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
|
|||
topicObj.ast = strdup(pCreate->ast);
|
||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
|
||||
topicObj.withTbName = 0;
|
||||
topicObj.withSchema = 0;
|
||||
topicObj.withTbName = pCreate->withTbName;
|
||||
topicObj.withSchema = pCreate->withSchema;
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
|
||||
|
|
|
@ -116,10 +116,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
|
|||
// SMetaDB
|
||||
int metaOpenDB(SMeta* pMeta);
|
||||
void metaCloseDB(SMeta* pMeta);
|
||||
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
|
||||
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
|
||||
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
|
||||
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
|
||||
// int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
|
||||
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
|
||||
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
|
||||
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
@ -128,4 +128,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_VNODE_META_H_*/
|
||||
#endif /*_TD_VNODE_META_H_*/
|
||||
|
|
|
@ -158,7 +158,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
|
|||
skmDbKey.sver = sver;
|
||||
pKey = &skmDbKey;
|
||||
kLen = sizeof(skmDbKey);
|
||||
metaRLock(pMeta);
|
||||
ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
|
||||
metaULock(pMeta);
|
||||
if (ret < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -181,6 +183,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
|
|||
}
|
||||
|
||||
struct SMCtbCursor {
|
||||
SMeta *pMeta;
|
||||
TDBC *pCur;
|
||||
tb_uid_t suid;
|
||||
void *pKey;
|
||||
|
@ -200,9 +203,13 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pCtbCur->pMeta = pMeta;
|
||||
pCtbCur->suid = uid;
|
||||
metaRLock(pMeta);
|
||||
|
||||
ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
|
||||
if (ret < 0) {
|
||||
metaULock(pMeta);
|
||||
taosMemoryFree(pCtbCur);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -220,6 +227,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
|
|||
|
||||
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
|
||||
if (pCtbCur) {
|
||||
if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
|
||||
if (pCtbCur->pCur) {
|
||||
tdbDbcClose(pCtbCur->pCur);
|
||||
|
||||
|
@ -269,7 +277,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
|||
|
||||
pSW = metaGetTableSchema(pMeta, quid, sver, 0);
|
||||
if (!pSW) return NULL;
|
||||
|
||||
|
||||
tdInitTSchemaBuilder(&sb, 0);
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
pSchema = pSW->pSchema + i;
|
||||
|
|
|
@ -427,9 +427,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
SMqDataBlkRsp rsp = {0};
|
||||
rsp.reqOffset = pReq->currentOffset;
|
||||
rsp.withSchema = pExec->withSchema;
|
||||
|
||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
||||
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
|
||||
|
||||
int8_t withTbName = pExec->withTbName;
|
||||
if (pReq->withTbName != -1) {
|
||||
withTbName = pReq->withTbName;
|
||||
}
|
||||
rsp.withTbName = withTbName;
|
||||
|
||||
while (1) {
|
||||
consumerEpoch = atomic_load_32(&pExec->epoch);
|
||||
|
@ -535,6 +543,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
taosArrayPush(rsp.blockSchema, &pSW);
|
||||
}
|
||||
|
||||
if (withTbName) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
|
||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
char* tbName = strdup(mr.me.name);
|
||||
taosArrayPush(rsp.blockTbName, &tbName);
|
||||
metaReaderClear(&mr);
|
||||
}
|
||||
|
||||
rsp.blockNum++;
|
||||
}
|
||||
// db subscribe
|
||||
|
@ -563,6 +583,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
ASSERT(actualLen <= dataStrLen);
|
||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
||||
taosArrayPush(rsp.blockData, &buf);
|
||||
if (withTbName) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
char* tbName = strdup(mr.me.name);
|
||||
taosArrayPush(rsp.blockTbName, &tbName);
|
||||
metaReaderClear(&mr);
|
||||
}
|
||||
|
||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
|
||||
taosArrayPush(rsp.blockSchema, &pSW);
|
||||
|
@ -614,6 +644,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
taosArrayDestroy(rsp.blockData);
|
||||
taosArrayDestroy(rsp.blockDataLen);
|
||||
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -486,8 +486,10 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
|
|||
|
||||
for (int i = 1; i < pCommith->niters; i++) {
|
||||
tSkipListDestroyIter(pCommith->iters[i].pIter);
|
||||
tdFreeSchema(pCommith->iters[i].pTable->pSchema);
|
||||
taosMemoryFree(pCommith->iters[i].pTable);
|
||||
if (pCommith->iters[i].pTable) {
|
||||
tdFreeSchema(pCommith->iters[i].pTable->pSchema);
|
||||
taosMemoryFreeClear(pCommith->iters[i].pTable);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pCommith->iters);
|
||||
|
|
|
@ -62,6 +62,16 @@ int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
|
|||
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
|
||||
if (pMemTable) {
|
||||
taosHashCleanup(pMemTable->pHashIdx);
|
||||
SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx);
|
||||
SSkipListNode *pNode = NULL;
|
||||
STbData *pTbData = NULL;
|
||||
for (;;) {
|
||||
if (!tSkipListIterNext(pIter)) break;
|
||||
pNode = tSkipListIterGet(pIter);
|
||||
pTbData = (STbData *)pNode->pData;
|
||||
tsdbFreeTbData(pTbData);
|
||||
}
|
||||
tSkipListDestroyIter(pIter);
|
||||
tSkipListDestroy(pMemTable->pSlIdx);
|
||||
taosMemoryFree(pMemTable);
|
||||
}
|
||||
|
@ -300,6 +310,17 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
|
|||
TSKEY keyMax;
|
||||
SSubmitBlk *pBlkCopy;
|
||||
|
||||
// check if table exists
|
||||
SMetaReader mr = {0};
|
||||
SMetaEntry me = {0};
|
||||
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
|
||||
// create container is nedd
|
||||
tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
|
||||
if (tptr == NULL) {
|
||||
|
|
|
@ -502,7 +502,7 @@ _exit:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
|
||||
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STSchema *pSchema = NULL;
|
||||
tb_uid_t suid = 0;
|
||||
|
@ -544,7 +544,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
|
|||
while (true) {
|
||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||
if (pBlock == NULL) break;
|
||||
|
||||
|
||||
vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
|
||||
}
|
||||
|
||||
|
@ -601,7 +601,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
|
||||
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
|
||||
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||
pRsp->code = terrno;
|
||||
submitBlkRsp.code = terrno;
|
||||
tDecoderClear(&decoder);
|
||||
goto _exit;
|
||||
}
|
||||
|
@ -623,8 +623,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
}
|
||||
|
||||
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
|
||||
pRsp->code = terrno;
|
||||
goto _exit;
|
||||
submitBlkRsp.code = terrno;
|
||||
}
|
||||
|
||||
submitRsp.numOfRows += submitBlkRsp.numOfRows;
|
||||
|
|
|
@ -72,6 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
|||
int32_t ret = 0;
|
||||
SMsgCb *pMsgCb = rpcHandle;
|
||||
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
|
||||
pMsg->noResp = 1;
|
||||
tmsgSendReq(rpcHandle, pEpSet, pMsg);
|
||||
} else {
|
||||
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
|
||||
|
|
|
@ -616,10 +616,10 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
|||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
||||
void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
|
||||
void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
|
||||
|
||||
void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
||||
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
|
||||
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
|
||||
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
|
||||
|
|
|
@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
|
|||
|
||||
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
||||
|
||||
static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||
static int32_t doCopyToSDataBlock(SExecTaskInfo *taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||
SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset,
|
||||
SqlFunctionCtx* pCtx);
|
||||
|
||||
|
@ -579,7 +579,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
|
|||
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
|
||||
}
|
||||
|
||||
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
pCtx[k].startTs = pWin->skey;
|
||||
|
@ -618,9 +618,14 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
|
|||
pEntryInfo->numOfRes = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
|
||||
pCtx[k].fpSet.process(&pCtx[k]);
|
||||
code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
|
||||
taskInfo->code = code;
|
||||
longjmp(taskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
// restore it
|
||||
|
@ -806,7 +811,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
|
|||
// this can be set during create the struct
|
||||
// todo add a dummy funtion to avoid process check
|
||||
if (pCtx[k].fpSet.process != NULL) {
|
||||
pCtx[k].fpSet.process(&pCtx[k]);
|
||||
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s call aggregate function error happens, code : %s",
|
||||
GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||
pOperator->pTaskInfo->code = code;
|
||||
longjmp(pOperator->pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2176,7 +2187,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
|
|||
* @param pQInfo
|
||||
* @param result
|
||||
*/
|
||||
int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
|
||||
|
@ -2215,8 +2226,14 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
|
|||
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
|
||||
|
||||
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
|
||||
if (pCtx[j].fpSet.process) {
|
||||
pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (pCtx[j].fpSet.finalize) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (TAOS_FAILED(code)) {
|
||||
qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code));
|
||||
taskInfo->code = code;
|
||||
longjmp(taskInfo->env, code);
|
||||
}
|
||||
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||
// do nothing, todo refactor
|
||||
} else {
|
||||
|
@ -2243,7 +2260,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
|
|||
return 0;
|
||||
}
|
||||
|
||||
void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
|
||||
void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
|
||||
SDiskbasedBuf* pBuf) {
|
||||
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
||||
|
||||
|
@ -2257,7 +2274,7 @@ void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo
|
|||
}
|
||||
|
||||
int32_t orderType = TSDB_ORDER_ASC;
|
||||
doCopyToSDataBlock(pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx);
|
||||
doCopyToSDataBlock(taskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx);
|
||||
|
||||
// add condition (pBlock->info.rows >= 1) just to runtime happy
|
||||
blockDataUpdateTsWindow(pBlock);
|
||||
|
@ -3749,7 +3766,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pTaskInfo, pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf);
|
||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
|
|
@ -234,7 +234,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t rowIndex = j - num;
|
||||
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
|
||||
|
||||
// assign the group keys or user input constant values if required
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
|
||||
|
@ -252,7 +252,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t rowIndex = pBlock->info.rows - num;
|
||||
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
|
||||
}
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
|
||||
|
||||
while(1) {
|
||||
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pRes);
|
||||
|
||||
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
||||
|
|
|
@ -703,7 +703,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
pInfo->order, false);
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
|
||||
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
STimeWindow nextWin = win;
|
||||
|
@ -740,7 +740,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
pInfo->order, false);
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
|
||||
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
|
@ -855,7 +855,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
// here we start a new session window
|
||||
|
@ -874,7 +874,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
|
@ -888,7 +888,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
|
@ -921,7 +921,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
@ -948,7 +948,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
|
||||
if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
|
@ -998,7 +998,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
@ -1035,7 +1035,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
|
||||
// TODO: remove for stream
|
||||
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
|
||||
|
@ -1233,7 +1233,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
|
||||
// pInfo->numOfRows data belong to the current session window
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
// here we start a new session window
|
||||
|
@ -1252,7 +1252,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
|
@ -1265,7 +1265,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
|
@ -1298,7 +1298,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
|
|
@ -95,6 +95,9 @@ bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
|||
int32_t stateCountFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t stateDurationFunction(SqlFunctionCtx* pCtx);
|
||||
|
||||
bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
int32_t csumFunction(SqlFunctionCtx* pCtx);
|
||||
|
||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -308,6 +308,37 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The input parameter of CSUM function can only be column");
|
||||
}
|
||||
|
||||
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
uint8_t resType;
|
||||
if (!IS_NUMERIC_TYPE(colType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
} else {
|
||||
if (IS_SIGNED_NUMERIC_TYPE(colType)) {
|
||||
resType = TSDB_DATA_TYPE_BIGINT;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) {
|
||||
resType = TSDB_DATA_TYPE_UBIGINT;
|
||||
} else if (IS_FLOAT_TYPE(colType)) {
|
||||
resType = TSDB_DATA_TYPE_DOUBLE;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -742,6 +773,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = stateDurationFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "csum",
|
||||
.type = FUNCTION_TYPE_CSUM,
|
||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateCsum,
|
||||
.getEnvFunc = getCsumFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = csumFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "abs",
|
||||
.type = FUNCTION_TYPE_ABS,
|
||||
|
|
|
@ -2818,7 +2818,6 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
|
|||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
@ -2856,7 +2855,6 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
|||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
@ -2896,3 +2894,58 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
return numOfElems;
|
||||
}
|
||||
|
||||
bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SSumRes);
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t csumFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
int32_t type = pInputCol->info.type;
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
//colDataAppendNULL(pOutput, i);
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
int64_t v;
|
||||
GET_TYPED_DATA(v, int64_t, type, data);
|
||||
pSumRes->isum += v;
|
||||
colDataAppend(pOutput, pos, (char *)&pSumRes->isum, false);
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
uint64_t v;
|
||||
GET_TYPED_DATA(v, uint64_t, type, data);
|
||||
pSumRes->usum += v;
|
||||
colDataAppend(pOutput, pos, (char *)&pSumRes->usum, false);
|
||||
} else if (IS_FLOAT_TYPE(type)) {
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, type, data);
|
||||
pSumRes->dsum += v;
|
||||
colDataAppend(pOutput, pos, (char *)&pSumRes->dsum, false);
|
||||
}
|
||||
|
||||
//TODO: remove this after pTsOutput is handled
|
||||
if (pTsOutput != NULL) {
|
||||
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
}
|
||||
|
||||
return numOfElems;
|
||||
}
|
||||
|
|
|
@ -695,6 +695,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
|
|||
udfCol->colMeta.scale = col->info.scale;
|
||||
udfCol->colMeta.precision = col->info.precision;
|
||||
udfCol->colData.numOfRows = udfBlock->numOfRows;
|
||||
udfCol->hasNull = col->hasNull;
|
||||
if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
|
||||
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
|
||||
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
|
||||
|
@ -731,6 +732,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
|
|||
col->info.bytes = meta->bytes;
|
||||
col->info.scale = meta->scale;
|
||||
col->info.type = meta->type;
|
||||
col->hasNull = udfCol->hasNull;
|
||||
SUdfColumnData *data = &udfCol->colData;
|
||||
|
||||
if (!IS_VAR_DATA_TYPE(meta->type)) {
|
||||
|
@ -929,7 +931,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
|
|||
while (!QUEUE_EMPTY(&conn->taskQueue)) {
|
||||
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
|
||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
|
||||
task->errCode = UDFC_CODE_PIPE_READ_ERR;
|
||||
task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
|
||||
QUEUE_REMOVE(&task->connTaskQueue);
|
||||
QUEUE_REMOVE(&task->procTaskQueue);
|
||||
uv_sem_post(&task->taskSem);
|
||||
|
@ -1117,7 +1119,7 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
|
|||
QUEUE_REMOVE(h);
|
||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
|
||||
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
|
||||
task->errCode = UDFC_CODE_STOPPING;
|
||||
task->errCode = TSDB_CODE_UDF_STOPPING;
|
||||
}
|
||||
uv_sem_post(&task->taskSem);
|
||||
}
|
||||
|
@ -1127,7 +1129,7 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
|
|||
QUEUE_REMOVE(h);
|
||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
|
||||
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
|
||||
task->errCode = UDFC_CODE_STOPPING;
|
||||
task->errCode = TSDB_CODE_UDF_STOPPING;
|
||||
}
|
||||
uv_sem_post(&task->taskSem);
|
||||
}
|
||||
|
@ -1211,7 +1213,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
|||
int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||
fnInfo("udfc setup udf. udfName: %s", udfName);
|
||||
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
|
||||
return UDFC_CODE_INVALID_STATE;
|
||||
return TSDB_CODE_UDF_INVALID_STATE;
|
||||
}
|
||||
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
|
||||
task->errCode = 0;
|
||||
|
@ -1225,7 +1227,7 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
|||
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
|
||||
if (errCode != 0) {
|
||||
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
|
||||
return UDFC_CODE_CONNECT_PIPE_ERR;
|
||||
return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
|
||||
}
|
||||
|
||||
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
||||
|
@ -1252,7 +1254,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
|
|||
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
|
||||
if (session->udfUvPipe == NULL) {
|
||||
fnError("No pipe to udfd");
|
||||
return UDFC_CODE_NO_PIPE;
|
||||
return TSDB_CODE_UDF_PIPE_NO_PIPE;
|
||||
}
|
||||
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
|
||||
task->errCode = 0;
|
||||
|
@ -1372,7 +1374,7 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
|
|||
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
|
||||
if (session->udfUvPipe == NULL) {
|
||||
fnError("pipe to udfd does not exist");
|
||||
return UDFC_CODE_NO_PIPE;
|
||||
return TSDB_CODE_UDF_PIPE_NO_PIPE;
|
||||
}
|
||||
|
||||
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
|
||||
|
|
|
@ -102,7 +102,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|||
int err = uv_dlopen(udf->path, &udf->lib);
|
||||
if (err != 0) {
|
||||
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||
return UDFC_CODE_LOAD_UDF_FAILURE;
|
||||
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
||||
}
|
||||
|
||||
char initFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
|
||||
|
|
|
@ -271,6 +271,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
|
|||
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
|
||||
char* syncNode2Str(const SSyncNode* pSyncNode);
|
||||
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
|
||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig);
|
||||
|
||||
SSyncNode* syncNodeAcquire(int64_t rid);
|
||||
void syncNodeRelease(SSyncNode* pNode);
|
||||
|
|
|
@ -57,6 +57,11 @@ SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b);
|
|||
SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b);
|
||||
void syncUtilMsgHtoN(void* msg);
|
||||
void syncUtilMsgNtoH(void* msg);
|
||||
bool syncUtilIsData(tmsg_t msgType);
|
||||
bool syncUtilUserPreCommit(tmsg_t msgType);
|
||||
bool syncUtilUserCommit(tmsg_t msgType);
|
||||
bool syncUtilUserRollback(tmsg_t msgType);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
#include "syncVoteMgr.h"
|
||||
#include "syncRaftCfg.h"
|
||||
|
||||
// TLA+ Spec
|
||||
// HandleAppendEntriesRequest(i, j, m) ==
|
||||
|
@ -199,7 +200,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
|
||||
assert(pRollBackEntry != NULL);
|
||||
|
||||
if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
|
||||
//if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
|
||||
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||
|
||||
|
@ -227,7 +229,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
//if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pAppendEntry->index;
|
||||
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||
|
@ -258,7 +261,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
//if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pAppendEntry->index;
|
||||
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||
|
@ -320,7 +324,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
//if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
|
@ -330,6 +335,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
|
||||
// config change
|
||||
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
SSyncCfg newSyncCfg;
|
||||
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
|
||||
ASSERT(ret == 0);
|
||||
|
||||
syncNodeUpdateConfig(ths, &newSyncCfg);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
#include "syncRaftCfg.h"
|
||||
|
||||
// \* Leader i advances its commitIndex.
|
||||
// \* This is done as a separate step from handling AppendEntries responses,
|
||||
|
@ -101,7 +102,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
//if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
|
@ -111,6 +113,15 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
|
||||
// config change
|
||||
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
SSyncCfg newSyncCfg;
|
||||
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
|
||||
ASSERT(ret == 0);
|
||||
|
||||
syncNodeUpdateConfig(pSyncNode, &newSyncCfg);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
|
|
@ -116,6 +116,15 @@ void syncStop(int64_t rid) {
|
|||
|
||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
|
||||
int32_t ret = 0;
|
||||
char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
|
||||
rpcMsg.noResp = 1;
|
||||
rpcMsg.contLen = strlen(configChange) + 1;
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange);
|
||||
taosMemoryFree(configChange);
|
||||
ret = syncPropose(rid, &rpcMsg, false);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -849,6 +858,35 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
|||
return s;
|
||||
}
|
||||
|
||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) {
|
||||
pSyncNode->pRaftCfg->cfg = *newConfig;
|
||||
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
|
||||
ASSERT(ret == 0);
|
||||
|
||||
// init internal
|
||||
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
||||
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
|
||||
|
||||
// init peersNum, peers, peersId
|
||||
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
|
||||
int j = 0;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
|
||||
pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
|
||||
j++;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
|
||||
}
|
||||
|
||||
// init replicaNum, replicasId
|
||||
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
|
||||
}
|
||||
}
|
||||
|
||||
SSyncNode* syncNodeAcquire(int64_t rid) {
|
||||
SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pNode == NULL) {
|
||||
|
@ -1207,7 +1245,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
|
|||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
//if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
|
@ -1228,7 +1267,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
|
|||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
//if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
|
|
|
@ -212,4 +212,32 @@ void syncUtilMsgNtoH(void* msg) {
|
|||
SMsgHead* pHead = msg;
|
||||
pHead->contLen = ntohl(pHead->contLen);
|
||||
pHead->vgId = ntohl(pHead->vgId);
|
||||
}
|
||||
|
||||
bool syncUtilIsData(tmsg_t msgType) {
|
||||
if (msgType == TDMT_VND_SYNC_NOOP || msgType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool syncUtilUserPreCommit(tmsg_t msgType) {
|
||||
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool syncUtilUserCommit(tmsg_t msgType) {
|
||||
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool syncUtilUserRollback(tmsg_t msgType) {
|
||||
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
|
@ -114,6 +114,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB
|
|||
|
||||
int tdbBtreeClose(SBTree *pBt) {
|
||||
if (pBt) {
|
||||
tdbFree(pBt->pBuf);
|
||||
tdbOsFree(pBt);
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -242,7 +242,7 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
|
|||
int h;
|
||||
|
||||
h = tdbPCachePageHash(&(pPage->pgid));
|
||||
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
|
||||
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
|
||||
;
|
||||
ASSERT(*ppPage == pPage);
|
||||
*ppPage = pPage->pHashNext;
|
||||
|
|
|
@ -65,11 +65,18 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
|
|||
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("failed to seek idx file, ver %ld, pos: %ld, since %s", ver, offset, terrstr());
|
||||
return -1;
|
||||
}
|
||||
SWalIdxEntry entry;
|
||||
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("failed to read idx file, since %s", terrstr());
|
||||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
wError("read idx file incompletely, read bytes %d, bytes should be %lu", ret, sizeof(SWalIdxEntry));
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -77,6 +84,7 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
|
|||
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("failed to seek log file, ver %ld, pos: %ld, since %s", ver, entry.offset, terrstr());
|
||||
return -1;
|
||||
}
|
||||
return ret;
|
||||
|
@ -92,6 +100,8 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
|||
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||
if (pLogTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||
wError("cannot open file %s, since %s", fnameStr, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -99,6 +109,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
|||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||
if (pIdxTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("cannot open file %s, since %s", fnameStr, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -113,6 +124,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
||||
wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||
return -1;
|
||||
}
|
||||
|
@ -125,11 +137,13 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
|
|||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||
ASSERT(pRet != NULL);
|
||||
if (pRead->curFileFirstVer != pRet->firstVer) {
|
||||
// error code set inner
|
||||
if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// error code set inner
|
||||
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -154,9 +168,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
|
|||
if (code < 0) return -1;
|
||||
}
|
||||
|
||||
if (!taosValidFile(pRead->pReadLogTFile)) {
|
||||
return -1;
|
||||
}
|
||||
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
|
||||
|
||||
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead));
|
||||
if (code != sizeof(SWalHead)) {
|
||||
|
@ -249,16 +261,19 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
// TODO: check wal life
|
||||
if (pRead->curVersion != ver) {
|
||||
if (walReadSeekVer(pRead, ver) < 0) {
|
||||
wError("unexpected wal log version: % " PRId64 ", since %s", ver, terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!taosValidFile(pRead->pReadLogTFile)) {
|
||||
return -1;
|
||||
}
|
||||
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
|
||||
|
||||
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
|
||||
if (code != sizeof(SWalHead)) {
|
||||
if (code < 0)
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
else
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -225,6 +225,7 @@ int walRoll(SWal *pWal) {
|
|||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
// terrno set inner
|
||||
code = walRollFileInfo(pWal);
|
||||
if (code != 0) {
|
||||
return -1;
|
||||
|
|
|
@ -454,6 +454,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied")
|
|||
//planner
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "planner internal error")
|
||||
|
||||
//udf
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_READ_ERR, "udf pipe read error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state")
|
||||
|
||||
//schemaless
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")
|
||||
|
|
|
@ -150,6 +150,9 @@ class TDSql:
|
|||
raise Exception(repr(e))
|
||||
return (self.queryRows, timeout)
|
||||
|
||||
def getRows(self):
|
||||
return self.queryRows
|
||||
|
||||
def checkRows(self, expectRows):
|
||||
if self.queryRows == expectRows:
|
||||
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows))
|
||||
|
|
|
@ -159,6 +159,7 @@ sql alter table db.stb rename tag t1 tx
|
|||
|
||||
print ========== alter common
|
||||
sql alter table db.stb comment 'abcde' ;
|
||||
sql alter table db.stb ttl 10 ;
|
||||
|
||||
sql show db.stables;
|
||||
if $data[0][6] != abcde then
|
||||
|
|
|
@ -346,8 +346,10 @@ class TDTestCase:
|
|||
return
|
||||
|
||||
def test_case3(self):
|
||||
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000)
|
||||
# self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000)
|
||||
|
||||
# self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000)
|
||||
self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*1000)
|
||||
|
||||
# self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000)
|
||||
# self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000)
|
||||
|
|
|
@ -29,8 +29,8 @@
|
|||
"batch_create_tbl_num": 50000,
|
||||
"data_source": "rand",
|
||||
"insert_mode": "taosc",
|
||||
"insert_rows": 0,
|
||||
"interlace_rows": 0,
|
||||
"insert_rows": 10,
|
||||
"interlace_rows": 100000,
|
||||
"insert_interval": 0,
|
||||
"max_sql_len": 10000000,
|
||||
"disorder_ratio": 0,
|
||||
|
|
Loading…
Reference in New Issue