Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
1191275afd
|
@ -204,7 +204,7 @@ do { \
|
|||
|
||||
#define TSDB_CLUSTER_ID_LEN 40
|
||||
#define TSDB_FQDN_LEN 128
|
||||
#define TSDB_EP_LEN (TSDB_FQDN_LEN+6)
|
||||
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
|
||||
#define TSDB_IPv4ADDR_LEN 16
|
||||
#define TSDB_FILENAME_LEN 128
|
||||
#define TSDB_SHOW_SQL_LEN 512
|
||||
|
|
|
@ -152,6 +152,7 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
|
|||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
||||
};
|
||||
|
||||
int32_t code = qParseQuerySql(&cxt, pQuery);
|
||||
tfree(cxt.ctx.db);
|
||||
return code;
|
||||
|
@ -420,7 +421,15 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||
}
|
||||
|
||||
SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen};
|
||||
SDataBuf buf = {.len = pMsg->contLen};
|
||||
buf.pData = calloc(1, pMsg->contLen);
|
||||
if (buf.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
|
||||
}
|
||||
|
||||
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
|
|
@ -99,10 +99,10 @@ SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
|
|||
} else {
|
||||
assert(pRequest != NULL);
|
||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
||||
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
||||
pMsgSendInfo->msgType = pRequest->type;
|
||||
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
||||
pMsgSendInfo->msgType = pRequest->type;
|
||||
pMsgSendInfo->requestId = pRequest->requestId;
|
||||
pMsgSendInfo->param = pRequest;
|
||||
pMsgSendInfo->param = pRequest;
|
||||
|
||||
pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericRspCallback:handleRequestRspFp[pRequest->type];
|
||||
}
|
||||
|
@ -165,8 +165,11 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
|
|||
pRetrieve->precision = htons(pRetrieve->precision);
|
||||
|
||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||
|
||||
tfree(pResInfo->pRspMsg);
|
||||
pResInfo->pRspMsg = pMsg->pData;
|
||||
pResInfo->numOfRows = pRetrieve->numOfRows;
|
||||
pResInfo->pData = pRetrieve->data; // todo fix this in async model
|
||||
pResInfo->pData = pRetrieve->data; // todo fix this in async model
|
||||
|
||||
pResInfo->current = 0;
|
||||
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
||||
|
|
|
@ -50,13 +50,13 @@ int main(int argc, char** argv) {
|
|||
TEST(testCase, driverInit_Test) { taos_init(); }
|
||||
|
||||
TEST(testCase, connect_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
|
@ -69,7 +69,7 @@ TEST(testCase, create_user_Test) {
|
|||
}
|
||||
|
||||
TEST(testCase, create_account_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
||||
|
@ -82,7 +82,7 @@ TEST(testCase, create_account_Test) {
|
|||
}
|
||||
|
||||
TEST(testCase, drop_account_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
||||
|
@ -95,7 +95,7 @@ TEST(testCase, drop_account_Test) {
|
|||
}
|
||||
|
||||
TEST(testCase, show_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||
|
@ -114,7 +114,7 @@ TEST(testCase, show_user_Test) {
|
|||
}
|
||||
|
||||
TEST(testCase, drop_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop user abc");
|
||||
|
@ -127,7 +127,7 @@ TEST(testCase, drop_user_Test) {
|
|||
}
|
||||
|
||||
TEST(testCase, show_db_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show databases");
|
||||
|
@ -146,7 +146,7 @@ TEST(testCase, show_db_Test) {
|
|||
}
|
||||
|
||||
TEST(testCase, create_db_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
|
||||
|
@ -169,8 +169,46 @@ TEST(testCase, create_db_Test) {
|
|||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_dnode_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create dnode abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create dnode, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
ASSERT_TRUE(pFields == NULL);
|
||||
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
ASSERT_EQ(numOfFields, 0);
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, drop_dnode_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop dnode 2");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop dnode, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
ASSERT_TRUE(pFields == NULL);
|
||||
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
ASSERT_EQ(numOfFields, 0);
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, use_db_test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
|
@ -188,7 +226,7 @@ TEST(testCase, use_db_test) {
|
|||
}
|
||||
|
||||
TEST(testCase, drop_db_test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
showDB(pConn);
|
||||
|
@ -210,7 +248,7 @@ TEST(testCase, drop_db_test) {
|
|||
}
|
||||
|
||||
TEST(testCase, create_stable_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
|
||||
|
@ -241,7 +279,7 @@ TEST(testCase, drop_db_test) {
|
|||
}
|
||||
|
||||
TEST(testCase, create_table_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
|
@ -256,7 +294,7 @@ TEST(testCase, create_table_Test) {
|
|||
TEST(testCase, create_ctable_Test) {}
|
||||
|
||||
TEST(testCase, show_stable_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
|
@ -273,7 +311,6 @@ TEST(testCase, show_stable_Test) {
|
|||
}
|
||||
|
||||
TAOS_ROW pRow = NULL;
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
|
@ -284,12 +321,11 @@ TEST(testCase, show_stable_Test) {
|
|||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, show_vgroup_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
|
@ -322,7 +358,7 @@ TEST(testCase, show_vgroup_Test) {
|
|||
}
|
||||
|
||||
TEST(testCase, drop_stable_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
|
||||
|
@ -347,7 +383,7 @@ TEST(testCase, drop_stable_Test) {
|
|||
}
|
||||
|
||||
//TEST(testCase, show_table_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
|
|
|
@ -51,6 +51,8 @@ struct SIndex {
|
|||
int64_t suid; // current super table id, -1 is normal table
|
||||
int32_t cVersion; // current version allocated to cache
|
||||
|
||||
char* path;
|
||||
|
||||
SIndexStat stat;
|
||||
pthread_mutex_t mtx;
|
||||
};
|
||||
|
@ -87,12 +89,23 @@ typedef struct SIndexTermQuery {
|
|||
EIndexQueryType qType;
|
||||
} SIndexTermQuery;
|
||||
|
||||
typedef struct Iterate {
|
||||
void* iter;
|
||||
typedef struct Iterate Iterate;
|
||||
|
||||
typedef struct IterateValue {
|
||||
int8_t type;
|
||||
char* colVal;
|
||||
SArray* val;
|
||||
} IterateValue;
|
||||
|
||||
typedef struct Iterate {
|
||||
void* iter;
|
||||
IterateValue val;
|
||||
bool (*next)(Iterate* iter);
|
||||
IterateValue* (*getValue)(Iterate* iter);
|
||||
} Iterate;
|
||||
|
||||
void iterateValueDestroy(IterateValue* iv, bool destroy);
|
||||
|
||||
extern void* indexQhandle;
|
||||
|
||||
int indexFlushCacheTFile(SIndex* sIdx, void*);
|
||||
|
|
|
@ -41,6 +41,7 @@ typedef struct IndexCache {
|
|||
|
||||
} IndexCache;
|
||||
|
||||
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
||||
typedef struct CacheTerm {
|
||||
// key
|
||||
int32_t nColVal;
|
||||
|
@ -57,6 +58,9 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type);
|
|||
|
||||
void indexCacheDestroy(void* cache);
|
||||
|
||||
Iterate* indexCacheIteratorCreate(IndexCache* cache);
|
||||
void indexCacheIteratorDestroy(Iterate* iiter);
|
||||
|
||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
|
||||
|
||||
// int indexCacheGet(void *cache, uint64_t *rst);
|
||||
|
@ -66,6 +70,8 @@ void indexCacheRef(IndexCache* cache);
|
|||
void indexCacheUnRef(IndexCache* cache);
|
||||
|
||||
void indexCacheDebug(IndexCache* cache);
|
||||
|
||||
void indexCacheDestroySkiplist(SSkipList* slt);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -319,6 +319,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min);
|
|||
StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallback callback);
|
||||
|
||||
FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut);
|
||||
|
||||
void fstStreamBuilderDestroy(FstStreamBuilder* b);
|
||||
// set up bound range
|
||||
// refator, simple code by marco
|
||||
|
||||
|
|
|
@ -113,6 +113,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArr
|
|||
void tfileReaderRef(TFileReader* reader);
|
||||
void tfileReaderUnRef(TFileReader* reader);
|
||||
|
||||
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type);
|
||||
void tfileWriteClose(TFileWriter* tw);
|
||||
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
|
||||
void tfileWriterDestroy(TFileWriter* tw);
|
||||
int tfileWriterPut(TFileWriter* tw, void* data);
|
||||
|
@ -123,6 +125,14 @@ IndexTFile* indexTFileCreate(const char* path);
|
|||
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
|
||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
|
||||
|
||||
Iterate* tfileIteratorCreate(TFileReader* reader);
|
||||
void tfileIteratorDestroy(Iterate* iterator);
|
||||
|
||||
TFileValue* tfileValueCreate(char* val);
|
||||
|
||||
int tfileValuePush(TFileValue* tf, uint64_t val);
|
||||
void tfileValueDestroy(TFileValue* tf);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
|
|
|
@ -75,9 +75,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
|||
sIdx->tindex = indexTFileCreate(path);
|
||||
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
sIdx->cVersion = 1;
|
||||
sIdx->path = calloc(1, strlen(path) + 1);
|
||||
memcpy(sIdx->path, path, strlen(path));
|
||||
pthread_mutex_init(&sIdx->mtx, NULL);
|
||||
|
||||
*index = sIdx;
|
||||
|
||||
return 0;
|
||||
#endif
|
||||
|
||||
|
@ -361,14 +364,96 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||
if (sIdx == NULL) { return -1; }
|
||||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
|
||||
|
||||
// handle flush
|
||||
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
|
||||
Iterate* tfileIter = tfileIteratorCreate(pReader);
|
||||
|
||||
SArray* result = taosArrayInit(1024, sizeof(void*));
|
||||
|
||||
bool cn = cacheIter->next(cacheIter);
|
||||
bool tn = tfileIter->next(tfileIter);
|
||||
while (cn == true && tn == true) {
|
||||
IterateValue* cv = cacheIter->getValue(cacheIter);
|
||||
IterateValue* tv = tfileIter->getValue(tfileIter);
|
||||
|
||||
// dump value
|
||||
int comp = strcmp(cv->colVal, tv->colVal);
|
||||
if (comp == 0) {
|
||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, cv->val);
|
||||
taosArrayAddAll(tfv->tableId, tv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
|
||||
cn = cacheIter->next(cacheIter);
|
||||
tn = tfileIter->next(tfileIter);
|
||||
continue;
|
||||
} else if (comp < 0) {
|
||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, cv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
|
||||
// copy to final Result;
|
||||
cn = cacheIter->next(cacheIter);
|
||||
} else {
|
||||
TFileValue* tfv = tfileValueCreate(tv->colVal);
|
||||
taosArrayPush(result, &tfv);
|
||||
taosArrayAddAll(tfv->tableId, tv->val);
|
||||
// copy to final result
|
||||
tn = tfileIter->next(tfileIter);
|
||||
}
|
||||
}
|
||||
while (cn == true) {
|
||||
IterateValue* cv = cacheIter->getValue(cacheIter);
|
||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, cv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
cn = cacheIter->next(cacheIter);
|
||||
}
|
||||
while (tn == true) {
|
||||
IterateValue* tv = tfileIter->getValue(tfileIter);
|
||||
TFileValue* tfv = tfileValueCreate(tv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, tv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
tn = tfileIter->next(tfileIter);
|
||||
}
|
||||
|
||||
int32_t version = CACHE_VERSION(pCache);
|
||||
uint8_t colType = pCache->type;
|
||||
|
||||
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, pCache->colName, colType);
|
||||
if (tw == NULL) {
|
||||
indexError("faile to open file to write");
|
||||
} else {
|
||||
int ret = tfileWriterPut(tw, result);
|
||||
if (ret != 0) { indexError("faile to write into tindex "); }
|
||||
}
|
||||
// not free later, just put int table cache
|
||||
SSkipList* timm = (SSkipList*)pCache->imm;
|
||||
pCache->imm = NULL; // or throw int bg thread
|
||||
indexCacheDestroySkiplist(timm);
|
||||
|
||||
tfileWriteClose(tw);
|
||||
indexCacheIteratorDestroy(cacheIter);
|
||||
tfileIteratorDestroy(tfileIter);
|
||||
|
||||
tfileReaderUnRef(pReader);
|
||||
indexCacheUnRef(pCache);
|
||||
return 0;
|
||||
}
|
||||
void iterateValueDestroy(IterateValue* value, bool destroy) {
|
||||
if (destroy) {
|
||||
taosArrayDestroy(value->val);
|
||||
} else {
|
||||
taosArrayClear(value->val);
|
||||
}
|
||||
free(value->colVal);
|
||||
value->colVal = NULL;
|
||||
}
|
||||
|
|
|
@ -94,6 +94,16 @@ void indexCacheDebug(IndexCache* cache) {
|
|||
tSkipListDestroyIter(iter);
|
||||
}
|
||||
|
||||
void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
if (ct != NULL) {}
|
||||
}
|
||||
tSkipListDestroyIter(iter);
|
||||
}
|
||||
|
||||
void indexCacheDestroy(void* cache) {
|
||||
IndexCache* pCache = cache;
|
||||
if (pCache == NULL) { return; }
|
||||
|
@ -108,6 +118,48 @@ static void doMergeWork(SSchedMsg* msg) {
|
|||
SIndex* sidx = (SIndex*)pCache->index;
|
||||
indexFlushCacheTFile(sidx, pCache);
|
||||
}
|
||||
static bool indexCacheIteratorNext(Iterate* itera) {
|
||||
SSkipListIterator* iter = itera->iter;
|
||||
if (iter == NULL) { return false; }
|
||||
|
||||
IterateValue* iv = &itera->val;
|
||||
iterateValueDestroy(iv, false);
|
||||
|
||||
bool next = tSkipListIterNext(iter);
|
||||
if (next) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
|
||||
iv->type = ct->operaType;
|
||||
iv->colVal = ct->colVal;
|
||||
|
||||
taosArrayPush(iv->val, &ct->uid);
|
||||
}
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
|
||||
return &iter->val;
|
||||
}
|
||||
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
||||
Iterate* iiter = calloc(1, sizeof(Iterate));
|
||||
if (iiter == NULL) { return NULL; }
|
||||
|
||||
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
||||
iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL;
|
||||
iiter->next = indexCacheIteratorNext;
|
||||
iiter->getValue = indexCacheIteratorGetValue;
|
||||
|
||||
return iiter;
|
||||
}
|
||||
void indexCacheIteratorDestroy(Iterate* iter) {
|
||||
if (iter == NULL) { return; }
|
||||
|
||||
tSkipListDestroyIter(iter->iter);
|
||||
iterateValueDestroy(&iter->val, true);
|
||||
free(iter);
|
||||
}
|
||||
|
||||
int indexCacheSchedToMerge(IndexCache* pCache) {
|
||||
SSchedMsg schedMsg = {0};
|
||||
|
|
|
@ -23,6 +23,13 @@
|
|||
#include "taosdef.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
typedef struct TFileFstIter {
|
||||
FstStreamBuilder* fb;
|
||||
StreamWithState* st;
|
||||
AutomationCtx* ctx;
|
||||
TFileReader* rdr;
|
||||
} TFileFstIter;
|
||||
|
||||
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))
|
||||
|
||||
static int tfileStrCompare(const void* a, const void* b);
|
||||
|
@ -184,6 +191,23 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
|
|||
return ret;
|
||||
}
|
||||
|
||||
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
|
||||
char filename[128] = {0};
|
||||
int32_t coldId = 1;
|
||||
tfileGenFileName(filename, suid, coldId, version);
|
||||
|
||||
char fullname[256] = {0};
|
||||
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
|
||||
WriterCtx* wcx = writerCtxCreate(TFile, fullname, true, 1024 * 1024);
|
||||
|
||||
TFileHeader tfh = {0};
|
||||
tfh.suid = suid;
|
||||
tfh.version = version;
|
||||
memcpy(tfh.colName, colName, strlen(colName));
|
||||
tfh.colType = colType;
|
||||
|
||||
return tfileWriterCreate(wcx, &tfh);
|
||||
}
|
||||
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
|
||||
// char pathBuf[128] = {0};
|
||||
// sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
|
||||
|
@ -279,6 +303,11 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
|
|||
tw->fb = NULL;
|
||||
return 0;
|
||||
}
|
||||
void tfileWriteClose(TFileWriter* tw) {
|
||||
if (tw == NULL) { return; }
|
||||
writerCtxDestroy(tw->ctx);
|
||||
free(tw);
|
||||
}
|
||||
void tfileWriterDestroy(TFileWriter* tw) {
|
||||
if (tw == NULL) { return; }
|
||||
|
||||
|
@ -314,6 +343,71 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
static bool tfileIteratorNext(Iterate* iiter) {
|
||||
IterateValue* iv = &iiter->val;
|
||||
iterateValueDestroy(iv, false);
|
||||
// SArray* tblIds = iv->val;
|
||||
|
||||
char* colVal = NULL;
|
||||
uint64_t offset = 0;
|
||||
|
||||
TFileFstIter* tIter = iiter->iter;
|
||||
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
|
||||
if (rt == NULL) { return false; }
|
||||
|
||||
int32_t sz = 0;
|
||||
char* ch = (char*)fstSliceData(&rt->data, &sz);
|
||||
colVal = calloc(1, sz + 1);
|
||||
memcpy(colVal, ch, sz);
|
||||
|
||||
offset = (uint64_t)(rt->out.out);
|
||||
|
||||
swsResultDestroy(rt);
|
||||
// set up iterate value
|
||||
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
|
||||
|
||||
iv->colVal = colVal;
|
||||
|
||||
// std::string key(ch, sz);
|
||||
}
|
||||
|
||||
static IterateValue* tifileIterateGetValue(Iterate* iter) {
|
||||
return &iter->val;
|
||||
}
|
||||
|
||||
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
||||
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
|
||||
if (tIter == NULL) { return NULL; }
|
||||
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
|
||||
tIter->fb = fstSearch(reader->fst, tIter->ctx);
|
||||
tIter->st = streamBuilderIntoStream(tIter->fb);
|
||||
tIter->rdr = reader;
|
||||
return tIter;
|
||||
}
|
||||
|
||||
Iterate* tfileIteratorCreate(TFileReader* reader) {
|
||||
Iterate* iter = calloc(1, sizeof(Iterate));
|
||||
|
||||
iter->iter = tfileFstIteratorCreate(reader);
|
||||
if (iter->iter == NULL) { return NULL; }
|
||||
|
||||
iter->next = tfileIteratorNext;
|
||||
iter->getValue = tifileIterateGetValue;
|
||||
return iter;
|
||||
}
|
||||
void tfileIteratorDestroy(Iterate* iter) {
|
||||
if (iter == NULL) { return; }
|
||||
IterateValue* iv = &iter->val;
|
||||
iterateValueDestroy(iv, true);
|
||||
|
||||
TFileFstIter* tIter = iter->iter;
|
||||
streamWithStateDestroy(tIter->st);
|
||||
fstStreamBuilderDestroy(tIter->fb);
|
||||
automCtxDestroy(tIter->ctx);
|
||||
|
||||
free(iter);
|
||||
}
|
||||
|
||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
|
||||
if (tf == NULL) { return NULL; }
|
||||
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||
|
@ -334,6 +428,23 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
|||
|
||||
return fn(av->colVal, bv->colVal);
|
||||
}
|
||||
|
||||
TFileValue* tfileValueCreate(char* val) {
|
||||
TFileValue* tf = calloc(1, sizeof(TFileValue));
|
||||
if (tf == NULL) { return NULL; }
|
||||
|
||||
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
||||
return tf;
|
||||
}
|
||||
int tfileValuePush(TFileValue* tf, uint64_t val) {
|
||||
if (tf == NULL) { return -1; }
|
||||
taosArrayPush(tf->tableId, &val);
|
||||
return 0;
|
||||
}
|
||||
void tfileValueDestroy(TFileValue* tf) {
|
||||
taosArrayDestroy(tf->tableId);
|
||||
free(tf);
|
||||
}
|
||||
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
|
||||
int sz = taosArrayGetSize(ids);
|
||||
SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
|
||||
|
|
|
@ -12,6 +12,8 @@ SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id,
|
|||
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen);
|
||||
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
|
||||
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
|
||||
SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
|
||||
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
|
||||
SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
|
||||
SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
|
||||
|
||||
#endif // TDENGINE_ASTTOMSG_H
|
||||
|
|
|
@ -356,7 +356,7 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
|
|||
return pCreateTableMsg;
|
||||
}
|
||||
|
||||
SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
|
||||
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
|
||||
SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0);
|
||||
|
||||
SName name = {0};
|
||||
|
@ -366,13 +366,53 @@ SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx*
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SDropTableMsg *pDropTableMsg = (SDropTableMsg*) calloc(1, sizeof(SDropTableMsg));
|
||||
SDropStbMsg *pDropTableMsg = (SDropStbMsg*) calloc(1, sizeof(SDropStbMsg));
|
||||
|
||||
code = tNameExtractFullName(&name, pDropTableMsg->name);
|
||||
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
|
||||
|
||||
pDropTableMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
|
||||
pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
|
||||
*len = sizeof(SDropTableMsg);
|
||||
return pDropTableMsg;
|
||||
}
|
||||
|
||||
SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) {
|
||||
const char* msg1 = "invalid host name (name too long, maximum length 128)";
|
||||
const char* msg2 = "dnode name can not be string";
|
||||
|
||||
if (taosArrayGetSize(pInfo->pMiscInfo->a) > 1) {
|
||||
buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SToken* id = taosArrayGet(pInfo->pMiscInfo->a, 0);
|
||||
if (id->type != TK_ID) {
|
||||
buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *) calloc(1, sizeof(SCreateDnodeMsg));
|
||||
strncpy(pCreate->ep, id->z, id->n);
|
||||
*len = sizeof(SCreateDnodeMsg);
|
||||
|
||||
return pCreate;
|
||||
}
|
||||
|
||||
SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) {
|
||||
SToken* pzName = taosArrayGet(pInfo->pMiscInfo->a, 0);
|
||||
|
||||
|
||||
char* end = NULL;
|
||||
SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg));
|
||||
pDrop->dnodeId = strtoll(pzName->z, &end, 10);
|
||||
*len = sizeof(SDropDnodeMsg);
|
||||
|
||||
if (end - pzName->z != pzName->n) {
|
||||
buildInvalidOperationMsg(pMsgBuf, "invalid dnode id");
|
||||
tfree(pDrop);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pDrop;
|
||||
}
|
||||
|
||||
|
|
|
@ -710,13 +710,32 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
|
|||
}
|
||||
|
||||
case TSDB_SQL_DROP_TABLE: {
|
||||
pDcl->pMsg = (char*)buildDropTableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf);
|
||||
pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf);
|
||||
if (pDcl->pMsg == NULL) {
|
||||
return terrno;
|
||||
code = terrno;
|
||||
}
|
||||
|
||||
pDcl->msgType = TDMT_MND_DROP_STB;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_SQL_CREATE_DNODE: {
|
||||
pDcl->pMsg = (char*) buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
|
||||
if (pDcl->pMsg == NULL) {
|
||||
code = terrno;
|
||||
}
|
||||
|
||||
pDcl->msgType = TDMT_MND_CREATE_DNODE;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_SQL_DROP_DNODE: {
|
||||
pDcl->pMsg = (char*) buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
|
||||
if (pDcl->pMsg == NULL) {
|
||||
code = terrno;
|
||||
}
|
||||
|
||||
pDcl->msgType = TDMT_MND_DROP_DNODE;
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,11 +44,12 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
|||
}
|
||||
|
||||
if (!isDqlSqlStatement(&info)) {
|
||||
SDclStmtInfo* pDcl = calloc(1, sizeof(SQueryStmtInfo));
|
||||
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo));
|
||||
if (NULL == pDcl) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code.
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pDcl->nodeType = info.type;
|
||||
int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
|
Loading…
Reference in New Issue