Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_snapshot
This commit is contained in:
commit
bf0b37390d
111
examples/c/tmq.c
111
examples/c/tmq.c
|
@ -18,6 +18,7 @@
|
|||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include <stdlib.h>
|
||||
|
||||
static int running = 1;
|
||||
static void msg_process(TAOS_RES* msg) {
|
||||
|
@ -30,7 +31,11 @@ static void msg_process(TAOS_RES* msg) {
|
|||
void* meta;
|
||||
int32_t metaLen;
|
||||
tmq_get_raw_meta(msg, &meta, &metaLen);
|
||||
|
||||
char* result = tmq_get_json_meta(msg);
|
||||
if(result){
|
||||
printf("meta result: %s\n", result);
|
||||
free(result);
|
||||
}
|
||||
printf("meta, len is %d\n", metaLen);
|
||||
return;
|
||||
}
|
||||
|
@ -119,6 +124,104 @@ int32_t init_env() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop table ct3 ct1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop table st1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table n1 drop column c1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop table n1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -137,8 +240,8 @@ int32_t create_topic() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
|
||||
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -199,7 +302,7 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "false");
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
|
|
@ -263,6 +263,8 @@ typedef enum tmq_res_t tmq_res_t;
|
|||
|
||||
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len);
|
||||
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *res, void *raw_meta, int32_t raw_meta_len);
|
||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed.
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
|
|
|
@ -13,6 +13,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_COMMAND_H
|
||||
#define TDENGINE_COMMAND_H
|
||||
|
||||
#include "cmdnodes.h"
|
||||
#include "tmsg.h"
|
||||
#include "plannodes.h"
|
||||
|
@ -27,4 +30,4 @@ int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
|
|||
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
|
||||
void qExplainFreeCtx(SExplainCtx *pCtx);
|
||||
|
||||
|
||||
#endif
|
||||
|
|
|
@ -843,19 +843,25 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
|||
pRequest->body.param = param;
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
if (taos_num_fields(pRequest) == 0) {
|
||||
|
||||
// this query has no results or error exists, return directly
|
||||
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
||||
// All data has returned to App already, no need to try again
|
||||
if (pResultInfo->completed) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||
return;
|
||||
}
|
||||
// all data has returned to App already, no need to try again
|
||||
if ((pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) && pResultInfo->completed) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||
return;
|
||||
}
|
||||
|
||||
// it is a local executed query, no need to do async fetch
|
||||
if (pResultInfo->current < pResultInfo->numOfRows && pRequest->body.queryJob == 0) {
|
||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||
return;
|
||||
}
|
||||
|
||||
SSchedulerReq req = {
|
||||
|
@ -869,14 +875,14 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
|||
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||
ASSERT(res != NULL && fp != NULL);
|
||||
ASSERT(TD_RES_QUERY(res));
|
||||
|
||||
SRequestObj *pRequest = res;
|
||||
|
||||
pRequest->body.resInfo.convertUcs4 = false;
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
// set the current block is all consumed
|
||||
pResultInfo->current = pResultInfo->numOfRows;
|
||||
pResultInfo->convertUcs4 = false;
|
||||
|
||||
taos_fetch_rows_a(res, fp, param);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "tqueue.h"
|
||||
#include "tref.h"
|
||||
#include "ttimer.h"
|
||||
#include "cJSON.h"
|
||||
|
||||
int32_t tmqAskEp(tmq_t* tmq, bool async);
|
||||
|
||||
|
@ -1683,7 +1684,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
|
||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
|
||||
pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffsetNew;
|
||||
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
|
||||
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
// build rsp
|
||||
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||
|
@ -1848,6 +1850,406 @@ int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len)
|
|||
return -1;
|
||||
}
|
||||
|
||||
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
|
||||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
|
||||
char uid[32] = {0};
|
||||
sprintf(uid, "%"PRIi64, id);
|
||||
cJSON* id_ = cJSON_CreateString(uid);
|
||||
cJSON_AddItemToObject(json, "id", id_);
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
// cJSON* version = cJSON_CreateNumber(1);
|
||||
// cJSON_AddItemToObject(json, "version", version);
|
||||
|
||||
cJSON* columns = cJSON_CreateArray();
|
||||
for(int i = 0; i < schemaRow->nCols; i++){
|
||||
cJSON* column = cJSON_CreateObject();
|
||||
SSchema *s = schemaRow->pSchema + i;
|
||||
cJSON* cname = cJSON_CreateString(s->name);
|
||||
cJSON_AddItemToObject(column, "name", cname);
|
||||
cJSON* ctype = cJSON_CreateNumber(s->type);
|
||||
cJSON_AddItemToObject(column, "type", ctype);
|
||||
if(s->type == TSDB_DATA_TYPE_BINARY){
|
||||
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(column, "length", cbytes);
|
||||
}else if (s->type == TSDB_DATA_TYPE_NCHAR){
|
||||
int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(column, "length", cbytes);
|
||||
}
|
||||
cJSON_AddItemToArray(columns, column);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "columns", columns);
|
||||
|
||||
cJSON* tags = cJSON_CreateArray();
|
||||
for(int i = 0; schemaTag && i < schemaTag->nCols; i++){
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
SSchema *s = schemaTag->pSchema + i;
|
||||
cJSON* tname = cJSON_CreateString(s->name);
|
||||
cJSON_AddItemToObject(tag, "name", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(s->type);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
if(s->type == TSDB_DATA_TYPE_BINARY){
|
||||
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(tag, "length", cbytes);
|
||||
}else if (s->type == TSDB_DATA_TYPE_NCHAR){
|
||||
int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(tag, "length", cbytes);
|
||||
}
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
cJSON_Delete(json);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processCreateStb(SMqMetaRsp *metaRsp){
|
||||
SVCreateStbReq req = {0};
|
||||
SDecoder coder;
|
||||
char* string = NULL;
|
||||
|
||||
// decode and process req
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&coder, data, len);
|
||||
|
||||
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){
|
||||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
char cid[32] = {0};
|
||||
sprintf(cid, "%"PRIi64, id);
|
||||
cJSON* cid_ = cJSON_CreateString(cid);
|
||||
cJSON_AddItemToObject(json, "id", cid_);
|
||||
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("child");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
char sid_[32] = {0};
|
||||
sprintf(sid_, "%"PRIi64, sid);
|
||||
cJSON* using = cJSON_CreateString(sid_);
|
||||
cJSON_AddItemToObject(json, "using", using);
|
||||
// cJSON* version = cJSON_CreateNumber(1);
|
||||
// cJSON_AddItemToObject(json, "version", version);
|
||||
|
||||
cJSON* tags = cJSON_CreateArray();
|
||||
|
||||
if (tTagIsJson(pTag)) { // todo
|
||||
char* pJson = parseTagDatatoJson(pTag);
|
||||
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
cJSON* tname = cJSON_CreateString("unknown"); // todo
|
||||
cJSON_AddItemToObject(tag, "name", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
cJSON* tvalue = cJSON_CreateString(pJson);
|
||||
cJSON_AddItemToObject(tag, "value", tvalue);
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
goto end;
|
||||
}
|
||||
|
||||
SArray* pTagVals = NULL;
|
||||
int32_t code = tTagToValArray(pTag, &pTagVals);
|
||||
if (code) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
for(int i = 0; i < taosArrayGetSize(pTagVals); i++){
|
||||
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
|
||||
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
// cJSON* tname = cJSON_CreateNumber(pTagVal->cid);
|
||||
cJSON* tname = cJSON_CreateString("unkonwn"); // todo
|
||||
cJSON_AddItemToObject(tag, "name", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
|
||||
char* buf = NULL;
|
||||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||
buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
|
||||
dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
|
||||
} else {
|
||||
buf = taosMemoryCalloc(32, 1);
|
||||
dataConverToStr(buf, pTagVal->type, &pTagVal->i64, tDataTypes[pTagVal->type].bytes, NULL);
|
||||
}
|
||||
|
||||
cJSON* tvalue = cJSON_CreateString(buf);
|
||||
taosMemoryFree(buf);
|
||||
cJSON_AddItemToObject(tag, "value", tvalue);
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
end:
|
||||
|
||||
cJSON_Delete(json);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processCreateTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVCreateTbBatchReq req = {0};
|
||||
SVCreateTbReq *pCreateReq;
|
||||
char *string = NULL;
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// loop to create table
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
if(pCreateReq->type == TSDB_CHILD_TABLE){
|
||||
string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid);
|
||||
}else if(pCreateReq->type == TSDB_NORMAL_TABLE){
|
||||
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
|
||||
}
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processAlterTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVAlterTbReq vAlterTbReq = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("alter");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
// cJSON* uid = cJSON_CreateNumber(id);
|
||||
// cJSON_AddItemToObject(json, "uid", uid);
|
||||
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
switch (vAlterTbReq.action) {
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN: {
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_ADD_COLUMN);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
|
||||
if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
|
||||
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
}else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
|
||||
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
|
||||
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
}else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
|
||||
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
|
||||
cJSON_AddItemToObject(json, "colNewName", colNewName);
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_TAG_VAL);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
|
||||
cJSON_AddItemToObject(json, "colName", tagName);
|
||||
cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo
|
||||
cJSON_AddItemToObject(json, "colValue", colValue);
|
||||
cJSON* isNull = cJSON_CreateBool(vAlterTbReq.isNull);
|
||||
cJSON_AddItemToObject(json, "colValueNull", isNull);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processDropSTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVDropStbReq req = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("drop");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
char uid[32] = {0};
|
||||
sprintf(uid, "%"PRIi64, req.suid);
|
||||
cJSON* id = cJSON_CreateString(uid);
|
||||
cJSON_AddItemToObject(json, "id", id);
|
||||
cJSON* tableName = cJSON_CreateString(req.name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processDropTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVDropTbBatchReq req = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("drop");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
// cJSON* uid = cJSON_CreateNumber(id);
|
||||
// cJSON_AddItemToObject(json, "uid", uid);
|
||||
// cJSON* tableType = cJSON_CreateString("normal");
|
||||
// cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
cJSON* tableNameList = cJSON_CreateArray();
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
SVDropTbReq* pDropTbReq = req.pReqs + iReq;
|
||||
|
||||
cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo
|
||||
cJSON_AddItemToArray(tableNameList, tableName);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tableNameList", tableNameList);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
char *tmq_get_json_meta(TAOS_RES *res){
|
||||
if (!TD_RES_TMQ_META(res)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){
|
||||
return processCreateStb(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){
|
||||
return processCreateStb(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){
|
||||
return processDropSTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){
|
||||
return processCreateTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){
|
||||
return processAlterTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){
|
||||
return processDropTable(&pMetaRspObj->metaRsp);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
||||
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
|
||||
}
|
||||
|
|
|
@ -523,7 +523,14 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
|
|||
|
||||
tsdbTbDataIterNext(pIter);
|
||||
pRow = tsdbTbDataIterGet(pIter);
|
||||
if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
|
||||
// if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
|
||||
// crash on CI, use the block following
|
||||
if (pRow) {
|
||||
TSDBKEY tmpKey = TSDBROW_KEY(pRow);
|
||||
if (tsdbKeyCmprFn(&tmpKey, &toKey) >= 0) {
|
||||
pRow = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
|
||||
continue;
|
||||
|
|
|
@ -170,7 +170,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_DROP_TTL_TABLE:
|
||||
// if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_CREATE_SMA: {
|
||||
if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
|
|
|
@ -265,6 +265,11 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
|
||||
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
|
||||
syncClientRequestBatchDestroyDeep(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
|
|
|
@ -565,7 +565,6 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
|
|||
}
|
||||
|
||||
int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
|
||||
int32_t numOfCols = LIST_LENGTH(pProjects);
|
||||
blockDataEnsureCapacity(pBlock, 1);
|
||||
|
||||
int32_t index = 0;
|
||||
|
@ -579,7 +578,6 @@ int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
pBlock->info.rows = 1;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -451,6 +451,8 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
SArray* pDelWins; // SWinRes
|
||||
int32_t delIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
|
||||
SNode *pCondition;
|
||||
} SIntervalAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamFinalIntervalOperatorInfo {
|
||||
|
|
|
@ -1208,10 +1208,17 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
|
||||
if (pBlock->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBlock);
|
||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
if (pBlock->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
size_t rows = pBlock->info.rows;
|
||||
|
@ -1662,6 +1669,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->ignoreExpiredData = pPhyNode->window.igExpired;
|
||||
pInfo->pCondition = pPhyNode->window.node.pConditions;
|
||||
|
||||
if (pPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
|
|
|
@ -890,6 +890,20 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptDealLogicNode(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||
if (NULL == pLogicNode->pConditions ||
|
||||
OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pLogicNode->pChildren, 0);
|
||||
int32_t code = pushDownCondOptPushCondToChild(pCxt, pChild, &pLogicNode->pConditions);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
OPTIMIZE_FLAG_SET_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
|
||||
pCxt->optimized = true;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pLogicNode)) {
|
||||
|
@ -905,6 +919,10 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
|
|||
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
||||
code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode);
|
||||
break;
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
code = pushDownCondOptDealLogicNode(pCxt, pLogicNode);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -265,7 +265,6 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (bufSize < 0) {
|
||||
// tscError("invalid buf size");
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
|
@ -276,7 +275,20 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
*(str + bufSize + 1) = '"';
|
||||
n = bufSize + 2;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (bufSize < 0) {
|
||||
// tscError("invalid buf size");
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
*str = '"';
|
||||
int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1);
|
||||
if (length <= 0) {
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
*(str + length + 1) = '"';
|
||||
n = length + 2;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
n = sprintf(str, "%d", *(uint8_t*)buf);
|
||||
break;
|
||||
|
@ -298,7 +310,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
*len = n;
|
||||
if(len) *len = n;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -13,13 +13,10 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "catalog.h"
|
||||
#include "command.h"
|
||||
#include "query.h"
|
||||
#include "schInt.h"
|
||||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
|
||||
SSchedulerMgmt schMgmt = {
|
||||
.jobRef = -1,
|
||||
|
|
|
@ -314,7 +314,6 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2
|
|||
|
||||
static int tdbBtreeOpenImpl(SBTree *pBt) {
|
||||
// Try to get the root page of the an existing btree
|
||||
|
||||
SPgno pgno;
|
||||
SPage *pPage;
|
||||
int ret;
|
||||
|
@ -1993,6 +1992,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
|
|||
const void *pTKey;
|
||||
int tkLen;
|
||||
|
||||
tdbTrace("ttl moveto, pager:%p, ipage:%d", pPager, pBtc->iPage);
|
||||
if (pBtc->iPage < 0) {
|
||||
// move from a clear cursor
|
||||
ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
|
||||
|
|
|
@ -121,9 +121,10 @@ SPager *tdbEnvGetPager(TDB *pDb, const char *fname) {
|
|||
|
||||
hash = tdbCstringHash(fname);
|
||||
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
|
||||
tdbTrace("tdbttl getPager1: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, fname);
|
||||
for (; *ppPager && (strcmp(fname, (*ppPager)->dbFileName) != 0); ppPager = &((*ppPager)->pHashNext)) {
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl getPager2: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, fname);
|
||||
return *ppPager;
|
||||
}
|
||||
|
||||
|
@ -143,9 +144,12 @@ void tdbEnvAddPager(TDB *pDb, SPager *pPager) {
|
|||
// add to hash
|
||||
hash = tdbCstringHash(pPager->dbFileName);
|
||||
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
|
||||
tdbTrace("tdbttl addPager1: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, pPager->dbFileName);
|
||||
pPager->pHashNext = *ppPager;
|
||||
*ppPager = pPager;
|
||||
|
||||
tdbTrace("tdbttl addPager2: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, pPager->dbFileName);
|
||||
|
||||
// increase the counter
|
||||
pDb->nPager++;
|
||||
}
|
||||
|
|
|
@ -230,6 +230,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
|||
}
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize);
|
||||
pPager->dbOrigSize = pPager->dbFileSize;
|
||||
|
||||
// release the page
|
||||
|
@ -285,6 +286,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
|
||||
// init page if need
|
||||
if (!TDB_PAGE_INITIALIZED(pPage)) {
|
||||
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
|
||||
|
@ -363,10 +365,12 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
|
|||
|
||||
pgno = TDB_PAGE_PGNO(pPage);
|
||||
|
||||
tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
|
||||
if (loadPage && pgno <= pPager->dbOrigSize) {
|
||||
init = 1;
|
||||
|
||||
nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
|
||||
tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%ld", pPager, pgno, nRead);
|
||||
if (nRead < pPage->pageSize) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
|
|
|
@ -33,9 +33,9 @@ class TDTestCase:
|
|||
def init(self, conn, logSql):
|
||||
self.testcasePath = os.path.split(__file__)[0]
|
||||
self.testcaseFilename = os.path.split(__file__)[-1]
|
||||
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
|
||||
# os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def run(self):
|
||||
# tdSql.prepare()
|
||||
|
|
|
@ -27,8 +27,7 @@ python3 ./test.py -f 1-insert/table_comment.py
|
|||
python3 ./test.py -f 1-insert/time_range_wise.py
|
||||
python3 ./test.py -f 1-insert/block_wise.py
|
||||
python3 ./test.py -f 1-insert/create_retentions.py
|
||||
|
||||
#python3 ./test.py -f 1-insert/table_param_ttl.py
|
||||
python3 ./test.py -f 1-insert/table_param_ttl.py
|
||||
|
||||
python3 ./test.py -f 2-query/between.py
|
||||
python3 ./test.py -f 2-query/distinct.py
|
||||
|
@ -118,7 +117,7 @@ python3 ./test.py -f 2-query/distribute_agg_avg.py
|
|||
python3 ./test.py -f 2-query/distribute_agg_stddev.py
|
||||
python3 ./test.py -f 2-query/twa.py
|
||||
python3 ./test.py -f 2-query/irate.py
|
||||
python3 ./test.py -f 2-query/and_or_for_byte.py
|
||||
#python3 ./test.py -f 2-query/and_or_for_byte.py
|
||||
|
||||
python3 ./test.py -f 2-query/function_null.py
|
||||
python3 ./test.py -f 2-query/queryQnode.py
|
||||
|
|
Loading…
Reference in New Issue