commit
7f18cc5b8b
|
@ -20,7 +20,7 @@
|
|||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
static int running = 1;
|
||||
|
||||
static int32_t msg_process(TAOS_RES* msg) {
|
||||
char buf[1024];
|
||||
|
@ -40,8 +40,8 @@ static int32_t msg_process(TAOS_RES* msg) {
|
|||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
//int32_t* length = taos_fetch_lengths(msg);
|
||||
int32_t precision = taos_result_precision(msg);
|
||||
// int32_t* length = taos_fetch_lengths(msg);
|
||||
int32_t precision = taos_result_precision(msg);
|
||||
rows++;
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("precision: %d, row content: %s\n", precision, buf);
|
||||
|
@ -62,14 +62,12 @@ static int32_t init_env() {
|
|||
pRes = taos_query(pConn, "drop topic topicname");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
|
@ -77,7 +75,7 @@ static int32_t init_env() {
|
|||
pRes = taos_query(pConn, "create database tmqdb precision 'ns'");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
|
@ -87,7 +85,7 @@ static int32_t init_env() {
|
|||
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
|
@ -96,28 +94,28 @@ static int32_t init_env() {
|
|||
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
|
@ -126,33 +124,37 @@ static int32_t init_env() {
|
|||
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
|
||||
END:
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t create_topic() {
|
||||
|
|
|
@ -59,6 +59,7 @@ typedef enum {
|
|||
TSDB_OPTION_TIMEZONE,
|
||||
TSDB_OPTION_CONFIGDIR,
|
||||
TSDB_OPTION_SHELL_ACTIVITY_TIMER,
|
||||
TSDB_OPTION_USE_ADAPTER,
|
||||
TSDB_MAX_OPTIONS
|
||||
} TSDB_OPTION;
|
||||
|
||||
|
@ -218,7 +219,7 @@ DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
|
|||
DLL_EXPORT int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo);
|
||||
DLL_EXPORT int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId);
|
||||
|
||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
||||
|
||||
/* --------------------------schemaless INTERFACE------------------------------- */
|
||||
|
||||
|
@ -229,13 +230,14 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len
|
|||
int precision);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows,
|
||||
int protocol, int precision, int64_t reqid);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
|
||||
int32_t ttl);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol,
|
||||
int precision, int32_t ttl, int64_t reqid);
|
||||
int precision, int32_t ttl, int64_t reqid);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
|
||||
int precision, int32_t ttl);
|
||||
int precision, int32_t ttl);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows,
|
||||
int protocol, int precision, int32_t ttl, int64_t reqid);
|
||||
int protocol, int precision, int32_t ttl, int64_t reqid);
|
||||
|
||||
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||
|
||||
|
@ -308,7 +310,8 @@ DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
|||
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
|
||||
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
|
||||
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
|
||||
DLL_EXPORT int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields);
|
||||
DLL_EXPORT int taos_write_raw_block_with_fields(TAOS *taos, int rows, char *pData, const char *tbname,
|
||||
TAOS_FIELD *fields, int numFields);
|
||||
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
|
||||
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
|
||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
|
||||
|
|
|
@ -101,6 +101,7 @@ extern int32_t tsRedirectPeriod;
|
|||
extern int32_t tsRedirectFactor;
|
||||
extern int32_t tsRedirectMaxPeriod;
|
||||
extern int32_t tsMaxRetryWaitTime;
|
||||
extern bool tsUseAdapter;
|
||||
|
||||
// client
|
||||
extern int32_t tsMinSlidingTime;
|
||||
|
|
|
@ -469,6 +469,9 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
case TSDB_OPTION_TIMEZONE:
|
||||
pItem = cfgGetItem(pCfg, "timezone");
|
||||
break;
|
||||
case TSDB_OPTION_USE_ADAPTER:
|
||||
pItem = cfgGetItem(pCfg, "useAdapter");
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -739,6 +739,7 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog
|
|||
SArray* pArray = NULL;
|
||||
SSubmitRsp* pRsp = (SSubmitRsp*)res;
|
||||
if (pRsp->nBlocks <= 0) {
|
||||
taosMemoryFreeClear(pRsp->pBlocks);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -438,6 +438,7 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
|
||||
taosMemoryFree(pParam->pOffset);
|
||||
taosMemoryFree(pBuf->pData);
|
||||
taosMemoryFree(pBuf->pEpSet);
|
||||
|
||||
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
|
||||
* pOffset->version);*/
|
||||
|
@ -724,7 +725,10 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
|
||||
if (pMsg) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -869,6 +873,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
|
|||
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
||||
pParam->rspErr = code;
|
||||
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1166,6 +1172,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
if (code != 0) {
|
||||
tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
|
||||
if (pMsg->pData) taosMemoryFree(pMsg->pData);
|
||||
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
|
||||
|
||||
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
|
||||
goto CREATE_MSG_FAIL;
|
||||
|
@ -1365,6 +1373,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
taosMemoryFree(pParam);
|
||||
}
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
|
@ -1416,6 +1425,8 @@ END:
|
|||
} else {
|
||||
taosMemoryFree(pParam);
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -96,6 +96,7 @@ int32_t tsRedirectPeriod = 10;
|
|||
int32_t tsRedirectFactor = 2;
|
||||
int32_t tsRedirectMaxPeriod = 1000;
|
||||
int32_t tsMaxRetryWaitTime = 10000;
|
||||
bool tsUseAdapter = false;
|
||||
|
||||
/*
|
||||
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
||||
|
@ -201,9 +202,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
|
|||
int32_t taosSetTfsCfg(SConfig *pCfg);
|
||||
#endif
|
||||
|
||||
struct SConfig *taosGetCfg() {
|
||||
return tsCfg;
|
||||
}
|
||||
struct SConfig *taosGetCfg() { return tsCfg; }
|
||||
|
||||
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
|
||||
char *apolloUrl) {
|
||||
|
@ -314,6 +313,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
|
||||
|
||||
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
|
||||
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
|
||||
|
@ -668,6 +668,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
|
||||
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
|
||||
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
|
||||
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
|
||||
|
||||
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
||||
return 0;
|
||||
|
|
|
@ -174,6 +174,7 @@ typedef struct {
|
|||
void* param;
|
||||
char opername[TSDB_TRANS_OPER_LEN];
|
||||
SArray* pRpcArray;
|
||||
SRWLatch lockRpcArray;
|
||||
} STrans;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -628,6 +628,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
|
|||
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
|
||||
taosInitRWLatch(&pTrans->lockRpcArray);
|
||||
|
||||
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
|
||||
pTrans->pRpcArray == NULL) {
|
||||
|
@ -737,12 +738,14 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
|
|||
if (pTrans->oper == oper) {
|
||||
if (strcasecmp(dbname, pTrans->dbname) == 0) {
|
||||
mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
|
||||
taosWLockLatch(&pTrans->lockRpcArray);
|
||||
if (pTrans->pRpcArray == NULL) {
|
||||
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
|
||||
pTrans->pRpcArray = taosArrayInit(4, sizeof(SRpcHandleInfo));
|
||||
}
|
||||
if (pTrans->pRpcArray != NULL && taosArrayPush(pTrans->pRpcArray, &pMsg->info) != NULL) {
|
||||
code = 0;
|
||||
}
|
||||
taosWUnLockLatch(&pTrans->lockRpcArray);
|
||||
|
||||
sdbRelease(pMnode->pSdb, pTrans);
|
||||
break;
|
||||
|
@ -944,8 +947,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|||
pTrans->failedTimes, code);
|
||||
}
|
||||
|
||||
taosWLockLatch(&pTrans->lockRpcArray);
|
||||
int32_t size = taosArrayGetSize(pTrans->pRpcArray);
|
||||
if (size <= 0) return;
|
||||
if (size <= 0) {
|
||||
taosWUnLockLatch(&pTrans->lockRpcArray);
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
|
||||
|
@ -997,6 +1004,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
}
|
||||
taosArrayClear(pTrans->pRpcArray);
|
||||
taosWUnLockLatch(&pTrans->lockRpcArray);
|
||||
}
|
||||
|
||||
int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
|
||||
|
|
|
@ -838,9 +838,9 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
|
||||
if (pUser->superUser) {
|
||||
cols = 0;
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
|
||||
|
||||
char privilege[20] = {0};
|
||||
|
@ -859,9 +859,9 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
char *db = taosHashIterate(pUser->readDbs, NULL);
|
||||
while (db != NULL) {
|
||||
cols = 0;
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
|
||||
|
||||
char privilege[20] = {0};
|
||||
|
|
|
@ -161,7 +161,10 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
|
|||
SMetaSnapWriter* pWriter = *ppWriter;
|
||||
|
||||
if (rollback) {
|
||||
metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
|
||||
code = metaAbort(pWriter->pMeta);
|
||||
metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
|
||||
code);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
|
||||
|
|
|
@ -710,6 +710,9 @@ int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
|||
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
|
||||
return 0;
|
||||
}
|
||||
metaDebug("vgId:%d, start to save ctime:%" PRId64 " uid:%" PRId64 " ct:%" PRId64, TD_VID(pMeta->pVnode), pME->version,
|
||||
pME->uid, ctimeKey.ctime);
|
||||
|
||||
return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn);
|
||||
}
|
||||
|
||||
|
|
|
@ -1109,7 +1109,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
|||
|
||||
if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) {
|
||||
if (pWriter->dWriter.pWriter) {
|
||||
ASSERT(fid > pWriter->fid);
|
||||
// ASSERT(fid > pWriter->fid);
|
||||
|
||||
code = tsdbSnapWriteCloseFile(pWriter);
|
||||
if (code) goto _err;
|
||||
|
|
|
@ -231,10 +231,6 @@ static const char* cacheModelStr(int8_t cacheModel) {
|
|||
return TSDB_CACHE_MODEL_NONE_STR;
|
||||
}
|
||||
|
||||
static const char* strictStr(int8_t strict) {
|
||||
return TSDB_DB_STRICT_ON == strict ? TSDB_DB_STRICT_ON_STR : TSDB_DB_STRICT_OFF_STR;
|
||||
}
|
||||
|
||||
static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, SDbCfgInfo* pCfg) {
|
||||
blockDataEnsureCapacity(pBlock, 1);
|
||||
pBlock->info.rows = 1;
|
||||
|
@ -269,11 +265,11 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
|
|||
buf2 + VARSTR_HEADER_SIZE,
|
||||
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
|
||||
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
|
||||
"STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
|
||||
"WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
|
||||
dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile,
|
||||
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
|
||||
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel,
|
||||
pCfg->numOfVgroups, 1 == pCfg->numOfStables);
|
||||
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups,
|
||||
1 == pCfg->numOfStables);
|
||||
|
||||
if (retentions) {
|
||||
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
|
||||
|
|
|
@ -245,6 +245,10 @@ static int32_t collectMetaKeyFromCreateTable(SCollectMetaKeyCxt* pCxt, SCreateTa
|
|||
if (TSDB_CODE_SUCCESS == code && NULL == pStmt->pTags) {
|
||||
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, AUTH_TYPE_WRITE,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -261,6 +265,10 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pClause->dbName, AUTH_TYPE_WRITE,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
|
@ -351,38 +359,59 @@ static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateS
|
|||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowDnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DNODES,
|
||||
pCxt->pMetaCache);
|
||||
if (pCxt->pParseCxt->enableSysInfo) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DNODES,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowMnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MNODES,
|
||||
pCxt->pMetaCache);
|
||||
if (pCxt->pParseCxt->enableSysInfo) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MNODES,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowModules(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MODULES,
|
||||
pCxt->pMetaCache);
|
||||
if (pCxt->pParseCxt->enableSysInfo) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MODULES,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowQnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_QNODES,
|
||||
pCxt->pMetaCache);
|
||||
if (pCxt->pParseCxt->enableSysInfo) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_QNODES,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowSnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_SNODES,
|
||||
pCxt->pMetaCache);
|
||||
if (pCxt->pParseCxt->enableSysInfo) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_SNODES,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowBnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_BNODES,
|
||||
pCxt->pMetaCache);
|
||||
if (pCxt->pParseCxt->enableSysInfo) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_BNODES,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowCluster(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CLUSTER,
|
||||
pCxt->pMetaCache);
|
||||
if (pCxt->pParseCxt->enableSysInfo) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CLUSTER,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowDatabases(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
|
|
|
@ -104,6 +104,22 @@ static int32_t authShowCreateTable(SAuthCxt* pCxt, SShowCreateTableStmt* pStmt)
|
|||
return checkAuth(pCxt, pStmt->dbName, AUTH_TYPE_READ);
|
||||
}
|
||||
|
||||
static int32_t authCreateTable(SAuthCxt* pCxt, SCreateTableStmt* pStmt) {
|
||||
return checkAuth(pCxt, pStmt->dbName, AUTH_TYPE_WRITE);
|
||||
}
|
||||
|
||||
static int32_t authCreateMultiTable(SAuthCxt* pCxt, SCreateMultiTableStmt* pStmt) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pStmt->pSubTables) {
|
||||
code = checkAuth(pCxt, ((SCreateSubTableClause*)pNode)->dbName, AUTH_TYPE_WRITE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
|
||||
switch (nodeType(pStmt)) {
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
|
@ -116,6 +132,10 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
|
|||
return authDelete(pCxt, (SDeleteStmt*)pStmt);
|
||||
case QUERY_NODE_INSERT_STMT:
|
||||
return authInsert(pCxt, (SInsertStmt*)pStmt);
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
return authCreateTable(pCxt, (SCreateTableStmt*)pStmt);
|
||||
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
|
||||
return authCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt);
|
||||
case QUERY_NODE_SHOW_DNODES_STMT:
|
||||
case QUERY_NODE_SHOW_MNODES_STMT:
|
||||
case QUERY_NODE_SHOW_MODULES_STMT:
|
||||
|
|
|
@ -875,18 +875,9 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt
|
|||
}
|
||||
|
||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||
#if 0
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||
}
|
||||
#else
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
||||
}
|
||||
#endif
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
|
||||
code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -1503,6 +1494,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* p
|
|||
|
||||
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf,
|
||||
SToken* pToken) {
|
||||
if (tsUseAdapter) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
|
||||
}
|
||||
|
||||
NEXT_TOKEN(pStmt->pSql, *pToken);
|
||||
if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
|
||||
|
@ -1726,6 +1721,8 @@ static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMet
|
|||
if (1 != taosArrayGetSize(pTables)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(*pMeta);
|
||||
SMetaRes* pRes = taosArrayGet(pTables, 0);
|
||||
if (TSDB_CODE_SUCCESS == pRes->code) {
|
||||
*pMeta = tableMetaDup((const STableMeta*)pRes->pRes);
|
||||
|
|
|
@ -953,6 +953,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == LIST_LENGTH(pFill->node.pTargets)) {
|
||||
code = createColumnByRewriteExpr(pFill->pWStartTs, &pFill->node.pTargets);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pFill;
|
||||
} else {
|
||||
|
|
|
@ -456,6 +456,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
|
||||
if (head->isHbParam) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
|
||||
SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
|
||||
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL};
|
||||
|
|
|
@ -275,6 +275,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
SyncIndex index = pEntry->index;
|
||||
SyncIndex prevIndex = pEntry->index - 1;
|
||||
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
|
||||
SSyncRaftEntry* pExist = NULL;
|
||||
bool inBuf = true;
|
||||
|
||||
if (index <= pBuf->commitIndex) {
|
||||
sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64
|
||||
|
@ -306,10 +308,9 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
}
|
||||
|
||||
// check current in buffer
|
||||
SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
|
||||
pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
|
||||
if (pExist != NULL) {
|
||||
ASSERT(pEntry->index == pExist->index);
|
||||
|
||||
if (pEntry->term != pExist->term) {
|
||||
(void)syncLogBufferRollback(pBuf, pNode, index);
|
||||
} else {
|
||||
|
@ -317,14 +318,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
|
||||
pBuf->endIndex);
|
||||
SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm;
|
||||
ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm);
|
||||
SyncTerm existPrevTerm = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index);
|
||||
ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
|
||||
ret = 0;
|
||||
goto _out;
|
||||
}
|
||||
}
|
||||
|
||||
// update
|
||||
ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
|
||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
|
||||
pEntry = NULL;
|
||||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
|
@ -337,6 +339,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
|
||||
_out:
|
||||
syncEntryDestroy(pEntry);
|
||||
if (!inBuf) {
|
||||
syncEntryDestroy(pExist);
|
||||
pExist = NULL;
|
||||
}
|
||||
syncLogBufferValidate(pBuf);
|
||||
taosThreadMutexUnlock(&pBuf->mutex);
|
||||
return ret;
|
||||
|
@ -1008,6 +1014,16 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
|
|||
lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||
ASSERT(toIndex == lastVer + 1);
|
||||
|
||||
// refill buffer on need
|
||||
if (toIndex <= pBuf->startIndex) {
|
||||
int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
|
||||
if (ret < 0) {
|
||||
sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pBuf->endIndex == toIndex);
|
||||
syncLogBufferValidate(pBuf);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -111,6 +111,9 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
|
|||
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt)) {
|
||||
tdbTrace("page/init: %p %" PRIu8 " %p", pPage, szAmHdr, xCellSize);
|
||||
pPage->pPageHdr = pPage->pData + szAmHdr;
|
||||
if (TDB_PAGE_NCELLS(pPage) == 0) {
|
||||
return tdbPageZero(pPage, szAmHdr, xCellSize);
|
||||
}
|
||||
pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
|
||||
pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage);
|
||||
pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);
|
||||
|
|
|
@ -466,11 +466,19 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
|
||||
tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(errno), pPager->dbFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
|
||||
if (pageBuf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdbDebug("tdb/abort: pager:%p,", pPager);
|
||||
|
||||
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
|
||||
// read pgno & the page from journal
|
||||
SPgno pgno;
|
||||
|
@ -481,6 +489,8 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdbTrace("tdb/abort: pgno:%d,", pgno);
|
||||
|
||||
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pageBuf);
|
||||
|
@ -578,12 +588,12 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
|
||||
tdbTrace("tdb/flush:%p, pgno:%d, %d/%d/%d", pPager, pgno, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
|
||||
pPager->dbOrigSize = maxPgno;
|
||||
|
||||
pPage->isDirty = 0;
|
||||
|
||||
tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
|
||||
tdbTrace("pager/flush drop page: %p, pgno:%d, from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
|
||||
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||
|
||||
|
@ -830,7 +840,7 @@ static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tdbPagerRestore(SPager *pPager, SBTree *pBt, const char *jFileName) {
|
||||
static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
|
||||
int ret = 0;
|
||||
SPgno journalSize = 0;
|
||||
u8 *pageBuf = NULL;
|
||||
|
@ -908,7 +918,7 @@ static int tdbPagerRestore(SPager *pPager, SBTree *pBt, const char *jFileName) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt) {
|
||||
int tdbPagerRestoreJournals(SPager *pPager) {
|
||||
tdbDirEntryPtr pDirEntry;
|
||||
tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName);
|
||||
if (pDir == NULL) {
|
||||
|
@ -919,7 +929,7 @@ int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt) {
|
|||
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
|
||||
char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
|
||||
if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
|
||||
if (tdbPagerRestore(pPager, pBt, name) < 0) {
|
||||
if (tdbPagerRestore(pPager, name) < 0) {
|
||||
tdbCloseDir(&pDir);
|
||||
|
||||
tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name);
|
||||
|
|
|
@ -107,6 +107,16 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
|
|||
|
||||
ASSERT(pPager != NULL);
|
||||
|
||||
if (rollback) {
|
||||
tdbPagerRollback(pPager);
|
||||
} else {
|
||||
ret = tdbPagerRestoreJournals(pPager);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pTb);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// pTb->pBt
|
||||
ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, pEnv, &(pTb->pBt));
|
||||
if (ret < 0) {
|
||||
|
@ -114,16 +124,6 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (rollback) {
|
||||
tdbPagerRollback(pPager);
|
||||
} else {
|
||||
ret = tdbPagerRestoreJournals(pPager, pTb->pBt);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pTb);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
*ppTb = pTb;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -197,7 +197,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initP
|
|||
TXN *pTxn);
|
||||
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
|
||||
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
|
||||
int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt);
|
||||
int tdbPagerRestoreJournals(SPager *pPager);
|
||||
int tdbPagerRollback(SPager *pPager);
|
||||
|
||||
// tdbPCache.c ====================================
|
||||
|
|
|
@ -599,6 +599,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
|
|||
exh->pThrd = conn->hostThrd;
|
||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||
conn->refId = exh->refId;
|
||||
|
||||
if (conn->refId == -1) {
|
||||
taosMemoryFree(exh);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -1022,6 +1022,11 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
|
||||
|
||||
#develop test
|
||||
|
|
|
@ -3,6 +3,9 @@ system sh/deploy.sh -n dnode1 -i 1
|
|||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
system sh/deploy.sh -n dnode5 -i 5
|
||||
system sh/deploy.sh -n dnode6 -i 6
|
||||
system sh/deploy.sh -n dnode7 -i 7
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
|
@ -14,6 +17,9 @@ print =============== step1: create dnodes
|
|||
sql create dnode $hostname port 7200
|
||||
sql create dnode $hostname port 7300
|
||||
sql create dnode $hostname port 7400
|
||||
sql create dnode $hostname port 7500
|
||||
sql create dnode $hostname port 7600
|
||||
sql create dnode $hostname port 7700
|
||||
|
||||
$x = 0
|
||||
step1:
|
||||
|
@ -29,7 +35,7 @@ print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
|||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
print ===> $data20 $data21 $data22 $data23 $data24 $data25
|
||||
print ===> $data30 $data31 $data32 $data33 $data24 $data35
|
||||
if $rows != 4 then
|
||||
if $rows != 7 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -969,38 +969,43 @@ bool matchVarWord(SWord* word1, SWord* word2) {
|
|||
// ------------------- match words --------------------------
|
||||
//
|
||||
|
||||
// compare command cmd1 come from shellCommands , cmd2 come from user input
|
||||
int32_t compareCommand(SWords* cmd1, SWords* cmd2) {
|
||||
SWord* word1 = cmd1->head;
|
||||
SWord* word2 = cmd2->head;
|
||||
// compare command cmdPattern come from shellCommands , cmdInput come from user input
|
||||
int32_t compareCommand(SWords* cmdPattern, SWords* cmdInput) {
|
||||
SWord* wordPattern = cmdPattern->head;
|
||||
SWord* wordInput = cmdInput->head;
|
||||
|
||||
if (word1 == NULL || word2 == NULL) {
|
||||
if (wordPattern == NULL || wordInput == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < cmd1->count; i++) {
|
||||
if (word1->type == WT_TEXT) {
|
||||
for (int32_t i = 0; i < cmdPattern->count; i++) {
|
||||
if (wordPattern->type == WT_TEXT) {
|
||||
// WT_TEXT match
|
||||
if (word1->len == word2->len) {
|
||||
if (strncasecmp(word1->word, word2->word, word1->len) != 0) return -1;
|
||||
} else if (word1->len < word2->len) {
|
||||
if (wordPattern->len == wordInput->len) {
|
||||
if (strncasecmp(wordPattern->word, wordInput->word, wordPattern->len) != 0) return -1;
|
||||
} else if (wordPattern->len < wordInput->len) {
|
||||
return -1;
|
||||
} else {
|
||||
// word1->len > word2->len
|
||||
if (strncasecmp(word1->word, word2->word, word2->len) == 0) {
|
||||
cmd1->matchIndex = i;
|
||||
cmd1->matchLen = word2->len;
|
||||
return i;
|
||||
// wordPattern->len > wordInput->len
|
||||
if (strncasecmp(wordPattern->word, wordInput->word, wordInput->len) == 0) {
|
||||
if (i + 1 == cmdInput->count) {
|
||||
// last word return match
|
||||
cmdPattern->matchIndex = i;
|
||||
cmdPattern->matchLen = wordInput->len;
|
||||
return i;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// WT_VAR auto match any one word
|
||||
if (word2->next == NULL) { // input words last one
|
||||
if (matchVarWord(word1, word2)) {
|
||||
cmd1->matchIndex = i;
|
||||
cmd1->matchLen = word2->len;
|
||||
if (wordInput->next == NULL) { // input words last one
|
||||
if (matchVarWord(wordPattern, wordInput)) {
|
||||
cmdPattern->matchIndex = i;
|
||||
cmdPattern->matchLen = wordInput->len;
|
||||
varMode = true;
|
||||
return i;
|
||||
}
|
||||
|
@ -1009,9 +1014,9 @@ int32_t compareCommand(SWords* cmd1, SWords* cmd2) {
|
|||
}
|
||||
|
||||
// move next
|
||||
word1 = word1->next;
|
||||
word2 = word2->next;
|
||||
if (word1 == NULL || word2 == NULL) {
|
||||
wordPattern = wordPattern->next;
|
||||
wordInput = wordInput->next;
|
||||
if (wordPattern == NULL || wordInput == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue