Merge branch '3.0' of github.com:taosdata/TDengine into feature/vnode

This commit is contained in:
Hongze Cheng 2022-01-03 18:11:01 +08:00
commit 687cf9a732
22 changed files with 412 additions and 179 deletions

View File

@ -40,6 +40,7 @@ enum {
// the SQL below is for mgmt node // the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_STABLE, "create-stable" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" )

View File

@ -27,7 +27,7 @@ extern "C" {
#include "tname.h" #include "tname.h"
#include "tvariant.h" #include "tvariant.h"
/* /**
* The first field of a node of any type is guaranteed to be the int16_t. * The first field of a node of any type is guaranteed to be the int16_t.
* Hence the type of any node can be gotten by casting it to SQueryNode. * Hence the type of any node can be gotten by casting it to SQueryNode.
*/ */
@ -157,7 +157,7 @@ typedef struct SVgDataBlocks {
typedef struct SInsertStmtInfo { typedef struct SInsertStmtInfo {
int16_t nodeType; int16_t nodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>. SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement] uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position const char* sql; // current sql statement position

View File

@ -202,7 +202,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);

View File

@ -108,8 +108,17 @@ void iterateValueDestroy(IterateValue* iv, bool destroy);
extern void* indexQhandle; extern void* indexQhandle;
typedef struct TFileCacheKey {
uint64_t suid;
uint8_t colType;
char* colName;
int32_t nColName;
} ICacheKey;
int indexFlushCacheTFile(SIndex* sIdx, void*); int indexFlushCacheTFile(SIndex* sIdx, void*);
int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
#define indexFatal(...) \ #define indexFatal(...) \
do { \ do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \

View File

@ -42,6 +42,7 @@ typedef struct IndexCache {
int32_t version; int32_t version;
int32_t nTerm; int32_t nTerm;
int8_t type; int8_t type;
uint64_t suid;
pthread_mutex_t mtx; pthread_mutex_t mtx;
} IndexCache; } IndexCache;
@ -58,7 +59,7 @@ typedef struct CacheTerm {
} CacheTerm; } CacheTerm;
// //
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type); IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type);
void indexCacheDestroy(void* cache); void indexCacheDestroy(void* cache);

View File

@ -49,13 +49,6 @@ typedef struct TFileValue {
int32_t offset; int32_t offset;
} TFileValue; } TFileValue;
typedef struct TFileCacheKey {
uint64_t suid;
uint8_t colType;
char* colName;
int32_t nColName;
} TFileCacheKey;
// table cache // table cache
// refactor to LRU cache later // refactor to LRU cache later
typedef struct TFileCache { typedef struct TFileCache {
@ -103,10 +96,10 @@ typedef struct TFileReaderOpt {
// tfile cache, manage tindex reader // tfile cache, manage tindex reader
TFileCache* tfileCacheCreate(const char* path); TFileCache* tfileCacheCreate(const char* path);
void tfileCacheDestroy(TFileCache* tcache); void tfileCacheDestroy(TFileCache* tcache);
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key); TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key);
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader); void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName); TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx); TFileReader* tfileReaderCreate(WriterCtx* ctx);
@ -124,6 +117,7 @@ int tfileWriterFinish(TFileWriter* tw);
// //
IndexTFile* indexTFileCreate(const char* path); IndexTFile* indexTFileCreate(const char* path);
void indexTFileDestroy(IndexTFile* tfile);
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result); int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);

View File

@ -34,7 +34,7 @@ extern "C" {
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \ #define SERIALIZE_VAR_TO_BUF(buf, var, type) \
do { \ do { \
type c = var; \ type c = var; \
assert(sizeof(var) == sizeof(type)); \ assert(sizeof(type) == sizeof(c)); \
memcpy((void*)buf, (void*)&c, sizeof(c)); \ memcpy((void*)buf, (void*)&c, sizeof(c)); \
buf += sizeof(c); \ buf += sizeof(c); \
} while (0) } while (0)

View File

@ -17,6 +17,7 @@
#include "indexInt.h" #include "indexInt.h"
#include "index_cache.h" #include "index_cache.h"
#include "index_tfile.h" #include "index_tfile.h"
#include "index_util.h"
#include "tdef.h" #include "tdef.h"
#include "tsched.h" #include "tsched.h"
@ -102,6 +103,7 @@ void indexClose(SIndex* sIdx) {
} }
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
pthread_mutex_destroy(&sIdx->mtx); pthread_mutex_destroy(&sIdx->mtx);
indexTFileDestroy(sIdx->tindex);
#endif #endif
free(sIdx->path); free(sIdx->path);
free(sIdx); free(sIdx);
@ -130,18 +132,28 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
// TODO(yihao): reduce the lock range // TODO(yihao): reduce the lock range
pthread_mutex_lock(&index->mtx); pthread_mutex_lock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
if (cache == NULL) { if (cache == NULL) {
IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType); IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType);
taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*)); taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
} }
} }
pthread_mutex_unlock(&index->mtx); pthread_mutex_unlock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
assert(*cache != NULL); assert(*cache != NULL);
int ret = indexCachePut(*cache, p, uid); int ret = indexCachePut(*cache, p, uid);
if (ret != 0) { return ret; } if (ret != 0) { return ret; }
@ -296,7 +308,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
// Get col info // Get col info
IndexCache* cache = NULL; IndexCache* cache = NULL;
pthread_mutex_lock(&sIdx->mtx); pthread_mutex_lock(&sIdx->mtx);
IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
char buf[128] = {0};
ICacheKey key = {.suid = term->suid, .colName = term->colName};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
if (pCache == NULL) { if (pCache == NULL) {
pthread_mutex_unlock(&sIdx->mtx); pthread_mutex_unlock(&sIdx->mtx);
return -1; return -1;
@ -360,6 +377,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
if (sz > 0) { if (sz > 0) {
// TODO(yihao): remove duplicate tableid // TODO(yihao): remove duplicate tableid
TFileValue* lv = taosArrayGetP(result, sz - 1); TFileValue* lv = taosArrayGetP(result, sz - 1);
// indexError("merge colVal: %s", lv->colVal);
if (strcmp(lv->colVal, tv->colVal) == 0) { if (strcmp(lv->colVal, tv->colVal) == 0) {
taosArrayAddAll(lv->tableId, tv->tableId); taosArrayAddAll(lv->tableId, tv->tableId);
tfileValueDestroy(tv); tfileValueDestroy(tv);
@ -368,6 +386,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
} }
} else { } else {
taosArrayPush(result, &tv); taosArrayPush(result, &tv);
// indexError("merge colVal: %s", tv->colVal);
} }
} }
static void indexDestroyTempResult(SArray* result) { static void indexDestroyTempResult(SArray* result) {
@ -383,10 +402,12 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache; IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName); TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) { indexWarn("empty pReader found"); }
// handle flush // handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache); Iterate* cacheIter = indexCacheIteratorCreate(pCache);
Iterate* tfileIter = tfileIteratorCreate(pReader); Iterate* tfileIter = tfileIteratorCreate(pReader);
if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); }
SArray* result = taosArrayInit(1024, sizeof(void*)); SArray* result = taosArrayInit(1024, sizeof(void*));
@ -459,14 +480,14 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
} else { } else {
if (value->val != NULL) { taosArrayClear(value->val); } if (value->val != NULL) { taosArrayClear(value->val); }
} }
// free(value->colVal); free(value->colVal);
value->colVal = NULL; value->colVal = NULL;
} }
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
int32_t version = CACHE_VERSION(cache); int32_t version = CACHE_VERSION(cache);
uint8_t colType = cache->type; uint8_t colType = cache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, cache->colName, colType); TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
if (tw == NULL) { if (tw == NULL) {
indexError("failed to open file to write"); indexError("failed to open file to write");
return -1; return -1;
@ -479,14 +500,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
} }
tfileWriterClose(tw); tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx->path, sIdx->suid, version, cache->colName); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
char buf[128] = {0};
TFileHeader* header = &reader->header;
ICacheKey key = {
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
char buf[128] = {0};
TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
pthread_mutex_lock(&sIdx->mtx); pthread_mutex_lock(&sIdx->mtx);
IndexTFile* ifile = (IndexTFile*)sIdx->tindex; IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
@ -497,3 +517,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
END: END:
tfileWriterClose(tw); tfileWriterClose(tw);
} }
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
char* p = buf;
SERIALIZE_MEM_TO_BUF(buf, key, suid);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
return buf - p;
}

View File

@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10000 * 10 #define MEM_TERM_LIMIT 10 * 10000
// ref index_cache.h:22 // ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \ //#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
@ -40,7 +40,7 @@ static bool indexCacheIteratorNext(Iterate* itera);
static IterateValue* indexCacheIteratorGetValue(Iterate* iter); static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
IndexCache* cache = calloc(1, sizeof(IndexCache)); IndexCache* cache = calloc(1, sizeof(IndexCache));
if (cache == NULL) { if (cache == NULL) {
indexError("failed to create index cache"); indexError("failed to create index cache");
@ -53,7 +53,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
cache->type = type; cache->type = type;
cache->index = idx; cache->index = idx;
cache->version = 0; cache->version = 0;
cache->suid = suid;
pthread_mutex_init(&cache->mtx, NULL); pthread_mutex_init(&cache->mtx, NULL);
indexCacheRef(cache); indexCacheRef(cache);
return cache; return cache;
@ -150,6 +150,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
MemTable* tbl = cache->imm; MemTable* tbl = cache->imm;
iiter->val.val = taosArrayInit(1, sizeof(uint64_t)); iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
iiter->val.colVal = NULL;
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL; iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
iiter->next = indexCacheIteratorNext; iiter->next = indexCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue; iiter->getValue = indexCacheIteratorGetValue;
@ -353,6 +354,9 @@ static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter; SSkipListIterator* iter = itera->iter;
if (iter == NULL) { return false; } if (iter == NULL) { return false; }
IterateValue* iv = &itera->val; IterateValue* iv = &itera->val;
if (iv->colVal != NULL && iv->val != NULL) {
// indexError("value in cache: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
}
iterateValueDestroy(iv, false); iterateValueDestroy(iv, false);
bool next = tSkipListIterNext(iter); bool next = tSkipListIterNext(iter);

View File

@ -319,7 +319,7 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) {
assert(s->state == OneTransNext || s->state == OneTrans); assert(s->state == OneTransNext || s->state == OneTrans);
uint8_t val; uint8_t val;
COMMON_INDEX(inp, 0x111111, val); COMMON_INDEX(inp, 0b111111, val);
s->val = (s->val & fstStateDict[s->state].val) | val; s->val = (s->val & fstStateDict[s->state].val) | val;
} }
@ -369,7 +369,7 @@ uint8_t fstStateInput(FstState* s, FstNode* node) {
bool null = false; bool null = false;
uint8_t inp = fstStateCommInput(s, &null); uint8_t inp = fstStateCommInput(s, &null);
uint8_t* data = fstSliceData(slice, NULL); uint8_t* data = fstSliceData(slice, NULL);
return null == false ? inp : data[-1]; return null == false ? inp : data[node->start - 1];
} }
uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) { uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
assert(s->state == AnyTrans); assert(s->state == AnyTrans);
@ -1062,6 +1062,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) {
} else { } else {
*null = true; *null = true;
} }
fstNodeDestroy(node);
return res; return res;
} }
@ -1286,6 +1287,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
StreamWithStateResult* result = swsResultCreate(&slice, fOutput, tState); StreamWithStateResult* result = swsResultCreate(&slice, fOutput, tState);
free(buf); free(buf);
fstSliceDestroy(&slice); fstSliceDestroy(&slice);
taosArrayDestroy(nodes);
return result; return result;
} }
free(buf); free(buf);

View File

@ -51,7 +51,6 @@ static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b); static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
TFileCache* tfileCacheCreate(const char* path) { TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache)); TFileCache* tcache = calloc(1, sizeof(TFileCache));
@ -80,18 +79,18 @@ TFileCache* tfileCacheCreate(const char* path) {
goto End; goto End;
} }
char buf[128] = {0}; char buf[128] = {0};
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid, ICacheKey key = {.suid = header->suid,
.colName = header->colName, .colName = header->colName,
.nColName = strlen(header->colName), .nColName = strlen(header->colName),
.colType = header->colType}; .colType = header->colType};
tfileSerialCacheKey(&key, buf);
int32_t sz = indexSerialCacheKey(&key, buf);
assert(sz < sizeof(buf));
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader); tfileReaderRef(reader);
// indexTable
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
} }
taosArrayDestroyEx(files, tfileDestroyFileName); taosArrayDestroyEx(files, tfileDestroyFileName);
return tcache; return tcache;
@ -117,30 +116,30 @@ void tfileCacheDestroy(TFileCache* tcache) {
free(tcache); free(tcache);
} }
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) { TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
char buf[128] = {0}; char buf[128] = {0};
tfileSerialCacheKey(key, buf); int32_t sz = indexSerialCacheKey(key, buf);
assert(sz < sizeof(buf));
TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
if (reader == NULL) { return NULL; } if (reader == NULL) { return NULL; }
tfileReaderRef(*reader); tfileReaderRef(*reader);
return *reader; return *reader;
} }
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) { void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
char buf[128] = {0}; char buf[128] = {0};
tfileSerialCacheKey(key, buf); int32_t sz = indexSerialCacheKey(key, buf);
// remove last version index reader // remove last version index reader
TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf)); TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
if (p != NULL) { if (p != NULL) {
TFileReader* oldReader = *p; TFileReader* oldReader = *p;
taosHashRemove(tcache->tableCache, buf, strlen(buf)); taosHashRemove(tcache->tableCache, buf, sz);
oldReader->remove = true; oldReader->remove = true;
tfileReaderUnRef(oldReader); tfileReaderUnRef(oldReader);
} }
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader); tfileReaderRef(reader);
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
return; return;
} }
TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* tfileReaderCreate(WriterCtx* ctx) {
@ -230,8 +229,6 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
return reader; return reader;
// tfileSerialCacheKey(&key, buf);
} }
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
// char pathBuf[128] = {0}; // char pathBuf[128] = {0};
@ -325,15 +322,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
tfileWriterClose(tw); tfileWriterClose(tw);
return -1; return -1;
} }
// write fst
// write data
indexError("--------Begin----------------"); indexError("--------Begin----------------");
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
// TODO, fst batch write later // TODO, fst batch write later
TFileValue* v = taosArrayGetP((SArray*)data, i); TFileValue* v = taosArrayGetP((SArray*)data, i);
if (tfileWriteData(tw, v) == 0) { if (tfileWriteData(tw, v) != 0) {
// indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
(int)taosArrayGetSize(v->tableId));
} else {
indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
(int)taosArrayGetSize(v->tableId));
} }
indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId));
} }
indexError("--------End----------------"); indexError("--------End----------------");
fstBuilderFinish(tw->fb); fstBuilderFinish(tw->fb);
@ -359,7 +360,7 @@ IndexTFile* indexTFileCreate(const char* path) {
tfile->cache = tfileCacheCreate(path); tfile->cache = tfileCacheCreate(path);
return tfile; return tfile;
} }
void IndexTFileDestroy(IndexTFile* tfile) { void indexTFileDestroy(IndexTFile* tfile) {
tfileCacheDestroy(tfile->cache); tfileCacheDestroy(tfile->cache);
free(tfile); free(tfile);
} }
@ -369,9 +370,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
if (tfile == NULL) { return ret; } if (tfile == NULL) { return ret; }
IndexTFile* pTfile = (IndexTFile*)tfile; IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
TFileCacheKey key = { ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TFileReader* reader = tfileCacheGet(pTfile->cache, &key); TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
if (reader == NULL) { return 0; } if (reader == NULL) { return 0; }
@ -385,8 +385,10 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
} }
static bool tfileIteratorNext(Iterate* iiter) { static bool tfileIteratorNext(Iterate* iiter) {
IterateValue* iv = &iiter->val; IterateValue* iv = &iiter->val;
if (iv->colVal != NULL && iv->val != NULL) {
// indexError("value in fst: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
}
iterateValueDestroy(iv, false); iterateValueDestroy(iv, false);
// SArray* tblIds = iv->val;
char* colVal = NULL; char* colVal = NULL;
uint64_t offset = 0; uint64_t offset = 0;
@ -406,14 +408,14 @@ static bool tfileIteratorNext(Iterate* iiter) {
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; } if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
iv->colVal = colVal; iv->colVal = colVal;
return true;
// std::string key(ch, sz); // std::string key(ch, sz);
} }
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
TFileFstIter* tIter = calloc(1, sizeof(Iterate)); TFileFstIter* tIter = calloc(1, sizeof(TFileFstIter));
if (tIter == NULL) { return NULL; } if (tIter == NULL) { return NULL; }
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
@ -435,6 +437,7 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
iter->next = tfileIteratorNext; iter->next = tfileIteratorNext;
iter->getValue = tifileIterateGetValue; iter->getValue = tifileIterateGetValue;
iter->val.val = taosArrayInit(1, sizeof(uint64_t)); iter->val.val = taosArrayInit(1, sizeof(uint64_t));
iter->val.colVal = NULL;
return iter; return iter;
} }
void tfileIteratorDestroy(Iterate* iter) { void tfileIteratorDestroy(Iterate* iter) {
@ -447,13 +450,14 @@ void tfileIteratorDestroy(Iterate* iter) {
streamWithStateDestroy(tIter->st); streamWithStateDestroy(tIter->st);
fstStreamBuilderDestroy(tIter->fb); fstStreamBuilderDestroy(tIter->fb);
automCtxDestroy(tIter->ctx); automCtxDestroy(tIter->ctx);
free(tIter);
free(iter); free(iter);
} }
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
if (tf == NULL) { return NULL; } if (tf == NULL) { return NULL; }
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
return tfileCacheGet(tf->cache, &key); return tfileCacheGet(tf->cache, &key);
} }
@ -480,7 +484,7 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
TFileValue* tfileValueCreate(char* val) { TFileValue* tfileValueCreate(char* val) {
TFileValue* tf = calloc(1, sizeof(TFileValue)); TFileValue* tf = calloc(1, sizeof(TFileValue));
if (tf == NULL) { return NULL; } if (tf == NULL) { return NULL; }
tf->colVal = val; tf->colVal = tstrdup(val);
tf->tableId = taosArrayInit(32, sizeof(uint64_t)); tf->tableId = taosArrayInit(32, sizeof(uint64_t));
return tf; return tf;
} }
@ -491,6 +495,7 @@ int tfileValuePush(TFileValue* tf, uint64_t val) {
} }
void tfileValueDestroy(TFileValue* tf) { void tfileValueDestroy(TFileValue* tf) {
taosArrayDestroy(tf->tableId); taosArrayDestroy(tf->tableId);
free(tf->colVal);
free(tf); free(tf);
} }
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
@ -648,10 +653,3 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
} }
return -1; return -1;
} }
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
// SERIALIZE_MEM_TO_BUF(buf, key, suid);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
}

View File

@ -24,8 +24,13 @@ class FstWriter {
_b = fstBuilderCreate(_wc, 0); _b = fstBuilderCreate(_wc, 0);
} }
bool Put(const std::string& key, uint64_t val) { bool Put(const std::string& key, uint64_t val) {
// char buf[128] = {0};
// int len = 0;
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
bool ok = fstBuilderInsert(_b, skey, val); bool ok = fstBuilderInsert(_b, skey, val);
fstSliceDestroy(&skey); fstSliceDestroy(&skey);
return ok; return ok;
} }
@ -61,6 +66,11 @@ class FstReadMemory {
return _fst != NULL; return _fst != NULL;
} }
bool Get(const std::string& key, uint64_t* val) { bool Get(const std::string& key, uint64_t* val) {
// char buf[128] = {0};
// int len = 0;
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
bool ok = fstGet(_fst, &skey, val); bool ok = fstGet(_fst, &skey, val);
fstSliceDestroy(&skey); fstSliceDestroy(&skey);
@ -135,15 +145,109 @@ int Performance_fstWriteRecords(FstWriter* b) {
} }
return L * M * N; return L * M * N;
} }
void Performance_fstReadRecords(FstReadMemory* m) {
std::string str("aa");
for (int i = 0; i < M; i++) {
str[0] = 'a' + i;
str.resize(2);
for (int j = 0; j < N; j++) {
str[1] = 'a' + j;
str.resize(2);
for (int k = 0; k < L; k++) {
str.push_back('a');
uint64_t val, cost;
if (m->GetWithTimeCostUs(str, &val, &cost)) {
printf("succes to get kv(%s, %" PRId64 "), cost: %" PRId64 "\n", str.c_str(), val, cost);
} else {
printf("failed to get key: %s\n", str.c_str());
}
}
}
}
}
void checkMillonWriteAndReadOfFst() {
tfInit();
FstWriter* fw = new FstWriter;
Performance_fstWriteRecords(fw);
delete fw;
FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024);
if (fr->init()) { printf("success to init fst read"); }
Performance_fstReadRecords(fr);
tfCleanup();
delete fr;
}
void checkFstLongTerm() {
tfInit();
FstWriter* fw = new FstWriter;
// Performance_fstWriteRecords(fw);
fw->Put("A B", 1);
fw->Put("C", 2);
fw->Put("a", 3);
delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64);
if (m->init() == false) {
std::cout << "init readMemory failed" << std::endl;
delete m;
return;
}
{
uint64_t val = 0;
if (m->Get("A B", &val)) {
std::cout << "success to Get: " << val << std::endl;
} else {
std::cout << "failed to Get:" << val << std::endl;
}
}
{
uint64_t val = 0;
if (m->Get("C", &val)) {
std::cout << "success to Get: " << val << std::endl;
} else {
std::cout << "failed to Get:" << val << std::endl;
}
}
{
uint64_t val = 0;
if (m->Get("a", &val)) {
std::cout << "success to Get: " << val << std::endl;
} else {
std::cout << "failed to Get:" << val << std::endl;
}
}
// prefix search
// std::vector<uint64_t> result;
// AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS);
// m->Search(ctx, result);
// std::cout << "size: " << result.size() << std::endl;
// assert(result.size() == count);
// for (int i = 0; i < result.size(); i++) {
// assert(result[i] == i); // check result
//}
tfCleanup();
// free(ctx);
// delete m;
}
void checkFstCheckIterator() { void checkFstCheckIterator() {
tfInit(); tfInit();
FstWriter* fw = new FstWriter; FstWriter* fw = new FstWriter;
int64_t s = taosGetTimestampUs(); int64_t s = taosGetTimestampUs();
int count = 2; int count = 2;
Performance_fstWriteRecords(fw); // Performance_fstWriteRecords(fw);
int64_t e = taosGetTimestampUs(); int64_t e = taosGetTimestampUs();
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
fw->Put("Hello world", 1);
fw->Put("hello world", 2);
fw->Put("hello worle", 3);
fw->Put("hello worlf", 4);
delete fw; delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64); FstReadMemory* m = new FstReadMemory(1024 * 64);
@ -171,7 +275,7 @@ void checkFstCheckIterator() {
void fst_get(Fst* fst) { void fst_get(Fst* fst) {
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
std::string term = "Hello"; std::string term = "Hello World";
FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size()); FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size());
uint64_t offset = 0; uint64_t offset = 0;
bool ret = fstGet(fst, &key, &offset); bool ret = fstGet(fst, &key, &offset);
@ -189,7 +293,7 @@ void validateTFile(char* arg) {
std::thread threads[NUM_OF_THREAD]; std::thread threads[NUM_OF_THREAD];
// std::vector<std::thread> threads; // std::vector<std::thread> threads;
TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1"); TFileReader* reader = tfileReaderOpen(arg, 0, 999992, "tag1");
for (int i = 0; i < NUM_OF_THREAD; i++) { for (int i = 0; i < NUM_OF_THREAD; i++) {
threads[i] = std::thread(fst_get, reader->fst); threads[i] = std::thread(fst_get, reader->fst);
@ -203,9 +307,12 @@ void validateTFile(char* arg) {
tfCleanup(); tfCleanup();
} }
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
if (argc > 1) { validateTFile(argv[1]); } // tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); }
// checkFstCheckIterator(); // checkFstCheckIterator();
// checkFstLongTerm();
// checkFstPrefixSearch(); // checkFstPrefixSearch();
checkMillonWriteAndReadOfFst();
return 1; return 1;
} }

View File

@ -457,7 +457,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
// taosArrayPush(data, &v4); // taosArrayPush(data, &v4);
fObj->Put(data); fObj->Put(data);
for (size_t i = 0; i < taosArrayGetSize(data); i++) { destroyTFileValue(taosArrayGetP(data, i)); } for (size_t i = 0; i < taosArrayGetSize(data); i++) {
// data
destroyTFileValue(taosArrayGetP(data, i));
}
taosArrayDestroy(data); taosArrayDestroy(data);
std::string colName("voltage"); std::string colName("voltage");
@ -470,6 +473,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
fObj->Get(&query, result); fObj->Get(&query, result);
assert(taosArrayGetSize(result) == 200); assert(taosArrayGetSize(result) == 200);
indexTermDestroy(term); indexTermDestroy(term);
taosArrayDestroy(result);
// tfileWriterDestroy(twrite); // tfileWriterDestroy(twrite);
} }
@ -477,7 +481,7 @@ class CacheObj {
public: public:
CacheObj() { CacheObj() {
// TODO // TODO
cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY); cache = indexCacheCreate(NULL, 0, "voltage", TSDB_DATA_TYPE_BINARY);
} }
int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) {
int ret = indexCachePut(cache, term, uid); int ret = indexCachePut(cache, term, uid);
@ -534,6 +538,7 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++); coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
// indexTermDestry(term); // indexTermDestry(term);
} }
{ {
@ -541,24 +546,28 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++); coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
} }
{ {
std::string colVal("v2"); std::string colVal("v2");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++); coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
} }
{ {
std::string colVal("v3"); std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++); coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
} }
{ {
std::string colVal("v3"); std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++); coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
} }
coj->Debug(); coj->Debug();
std::cout << "--------first----------" << std::endl; std::cout << "--------first----------" << std::endl;
@ -567,12 +576,14 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++); coj->Put(term, othColId, version++, suid++);
indexTermDestroy(term);
} }
{ {
std::string colVal("v4"); std::string colVal("v4");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++); coj->Put(term, othColId, version++, suid++);
indexTermDestroy(term);
} }
coj->Debug(); coj->Debug();
std::cout << "--------second----------" << std::endl; std::cout << "--------second----------" << std::endl;
@ -583,6 +594,7 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++); coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
} }
} }
coj->Debug(); coj->Debug();
@ -598,6 +610,9 @@ TEST_F(IndexCacheEnv, cache_test) {
coj->Get(&query, colId, 10000, ret, &valType); coj->Get(&query, colId, 10000, ret, &valType);
std::cout << "size : " << taosArrayGetSize(ret) << std::endl; std::cout << "size : " << taosArrayGetSize(ret) << std::endl;
assert(taosArrayGetSize(ret) == 4); assert(taosArrayGetSize(ret) == 4);
taosArrayDestroy(ret);
indexTermDestroy(term);
} }
{ {
std::string colVal("v2"); std::string colVal("v2");
@ -609,6 +624,9 @@ TEST_F(IndexCacheEnv, cache_test) {
coj->Get(&query, colId, 10000, ret, &valType); coj->Get(&query, colId, 10000, ret, &valType);
assert(taosArrayGetSize(ret) == 1); assert(taosArrayGetSize(ret) == 1);
taosArrayDestroy(ret);
indexTermDestroy(term);
} }
} }
class IndexObj { class IndexObj {
@ -678,13 +696,16 @@ class IndexObj {
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; } if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; }
return taosArrayGetSize(result); int sz = taosArrayGetSize(result);
indexMultiTermQueryDestroy(mq);
taosArrayDestroy(result);
return sz;
// assert(taosArrayGetSize(result) == targetSize); // assert(taosArrayGetSize(result) == targetSize);
} }
void PutOne(const std::string& colName, const std::string& colVal) { void PutOne(const std::string& colName, const std::string& colVal) {
SIndexMultiTerm* terms = indexMultiTermCreate();
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
Put(terms, 10); Put(terms, 10);
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
@ -783,18 +804,21 @@ TEST_F(IndexEnv2, testIndexOpen) {
index->Search(mq, result); index->Search(mq, result);
std::cout << "target size: " << taosArrayGetSize(result) << std::endl; std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
assert(taosArrayGetSize(result) == 400); assert(taosArrayGetSize(result) == 400);
taosArrayDestroy(result);
indexMultiTermQueryDestroy(mq);
} }
} }
TEST_F(IndexEnv2, testIndex_TrigeFlush) { TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = "/tmp/test"; std::string path = "/tmp/test1";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {
// r // r
std::cout << "failed to init" << std::endl; std::cout << "failed to init" << std::endl;
} }
int numOfTable = 100 * 10000; int numOfTable = 100 * 10000;
index->WriteMillonData("tag1", "Hello", numOfTable); index->WriteMillonData("tag1", "Hello Wolrd", numOfTable);
int target = index->SearchOne("tag1", "Hello"); int target = index->SearchOne("tag1", "Hello Wolrd");
std::cout << "Get Index: " << target << std::endl;
assert(numOfTable == target); assert(numOfTable == target);
} }
@ -821,14 +845,6 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
threads[i].join(); threads[i].join();
} }
} }
TEST_F(IndexEnv2, testIndex_multi_thread_write) {
std::string path = "/tmp";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndex_multi_thread_read) {
std::string path = "/tmp";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndex_restart) { TEST_F(IndexEnv2, testIndex_restart) {
std::string path = "/tmp"; std::string path = "/tmp";

View File

@ -68,7 +68,9 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
* @param type * @param type
* @return * @return
*/ */
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen); SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
/** /**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.

View File

@ -326,8 +326,7 @@ typedef struct SVgroupTablesBatch {
SVgroupInfo info; SVgroupInfo info;
} SVgroupTablesBatch; } SVgroupTablesBatch;
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len, int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
SEpSet* pEpSet) {
const char* msg1 = "invalid table name"; const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched"; const char* msg2 = "tags number not matched";
const char* msg3 = "tag value too long"; const char* msg3 = "tag value too long";
@ -359,7 +358,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
size_t numOfInputTag = taosArrayGetSize(pValList); size_t numOfInputTag = taosArrayGetSize(pValList);
STableMeta* pSuperTableMeta = NULL; STableMeta* pSuperTableMeta = NULL;
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
assert(pSuperTableMeta != NULL); assert(pSuperTableMeta != NULL);
// too long tag values will return invalid sql, not be truncated automatically // too long tag values will return invalid sql, not be truncated automatically
@ -501,14 +504,6 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.suid = pSuperTableMeta->suid;
req.ctbCfg.pTag = row; req.ctbCfg.pTag = row;
// pEpSet->inUse = info.inUse;
// pEpSet->numOfEps = info.numOfEps;
// for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
// pEpSet->port[i] = info.epAddr[i].port;
// tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
// }
// ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId);
// ((SMsgHead*)(*pOutput))->contLen = htonl(serLen);
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
if (pTableBatch == NULL) { if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0}; SVgroupTablesBatch tBatch = {0};
@ -525,12 +520,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
} }
// TODO: serialize and // TODO: serialize and
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
SVgroupTablesBatch** ppTbBatch = NULL;
SVgroupTablesBatch* pTbBatch = NULL;
do { do {
ppTbBatch = taosHashIterate(pVgroupHashmap, ppTbBatch); pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (ppTbBatch == NULL) break; if (pTbBatch == NULL) break;
SVgroupTablesBatch* pTbBatch = *ppTbBatch;
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen); void* buf = malloc(tlen);
@ -544,17 +539,29 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req)); tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
taosArrayPush(pBufArray, &buf); SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
} while (true); } while (true);
SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = pStmtInfo;
*len = sizeof(SInsertStmtInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
int32_t msgBufLen) {
int32_t code = 0; int32_t code = 0;
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo));
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m; SMsgBuf* pMsgBuf = &m;
@ -571,21 +578,25 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pPwd = &pUser->passwd; SToken* pPwd = &pUser->passwd;
if (pName->n >= TSDB_USER_LEN) { if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3); code = buildInvalidOperationMsg(pMsgBuf, msg3);
goto _error;
} }
if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) { if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
if (pInfo->type == TSDB_SQL_CREATE_USER) { if (pInfo->type == TSDB_SQL_CREATE_USER) {
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
} else { } else {
if (pUser->type == TSDB_ALTER_USER_PASSWD) { if (pUser->type == TSDB_ALTER_USER_PASSWD) {
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
} else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) { } else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
assert(pPwd->type == TSDB_DATA_TYPE_NULL); assert(pPwd->type == TSDB_DATA_TYPE_NULL);
@ -596,10 +607,12 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) { } else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) {
// pCmd->count = 2; // pCmd->count = 2;
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg4); code = buildInvalidOperationMsg(pMsgBuf, msg4);
goto _error;
} }
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
} }
@ -618,15 +631,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pPwd = &pInfo->pMiscInfo->user.passwd; SToken* pPwd = &pInfo->pMiscInfo->user.passwd;
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
if (pName->n >= TSDB_USER_LEN) { if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3); code = buildInvalidOperationMsg(pMsgBuf, msg3);
goto _error;
} }
if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt;
@ -636,7 +652,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
} }
@ -655,6 +672,10 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_SHOW: { case TSDB_SQL_SHOW: {
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf); code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW; pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW;
break; break;
} }
@ -664,13 +685,15 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg); code = buildInvalidOperationMsg(pMsgBuf, msg);
goto _error;
} }
SName n = {0}; SName n = {0};
int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n); int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg); code = buildInvalidOperationMsg(pMsgBuf, msg);
goto _error;
} }
SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg)); SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg));
@ -689,19 +712,22 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt); SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt);
if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) { if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
char buf[TSDB_DB_NAME_LEN] = {0}; char buf[TSDB_DB_NAME_LEN] = {0};
SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf)); SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf));
if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) { if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
pDcl->pMsg = (char*)pCreateMsg; pDcl->pMsg = (char*)pCreateMsg;
@ -719,7 +745,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SName name = {0}; SName name = {0};
code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n); code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg)); SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg));
@ -731,7 +758,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgType = TDMT_MND_DROP_DB;
pDcl->msgLen = sizeof(SDropDbMsg); pDcl->msgLen = sizeof(SDropDbMsg);
pDcl->pMsg = (char*)pDropDbMsg; pDcl->pMsg = (char*)pDropDbMsg;
return TSDB_CODE_SUCCESS; break;
} }
case TSDB_SQL_CREATE_TABLE: { case TSDB_SQL_CREATE_TABLE: {
@ -739,14 +766,16 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) { if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code; terrno = code;
goto _error;
} }
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB; pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) { } else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) != if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return code; goto _error;
} }
pDcl->msgType = TDMT_VND_CREATE_TABLE; pDcl->msgType = TDMT_VND_CREATE_TABLE;
@ -761,7 +790,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_DROP_TABLE: { case TSDB_SQL_DROP_TABLE: {
pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_DROP_STB; pDcl->msgType = TDMT_MND_DROP_STB;
@ -771,7 +800,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_CREATE_DNODE: { case TSDB_SQL_CREATE_DNODE: {
pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_CREATE_DNODE; pDcl->msgType = TDMT_MND_CREATE_DNODE;
@ -781,7 +810,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_DROP_DNODE: { case TSDB_SQL_DROP_DNODE: {
pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_DROP_DNODE; pDcl->msgType = TDMT_MND_DROP_DNODE;
@ -792,5 +821,29 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
break; break;
} }
return code; return pDcl;
_error:
terrno = code;
tfree(pDcl);
return NULL;
}
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
SInsertStmtInfo* pInsertStmt = NULL;
int32_t msgLen = 0;
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
if (code != TSDB_CODE_SUCCESS) {
tfree(pInsertStmt);
return NULL;
}
return pInsertStmt;
} }

View File

@ -32,7 +32,7 @@ bool isInsertSql(const char* pStr, size_t length) {
} }
bool qIsDdlQuery(const SQueryNode* pQuery) { bool qIsDdlQuery(const SQueryNode* pQuery) {
return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type; return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type;
} }
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
@ -44,16 +44,29 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
} }
if (!isDqlSqlStatement(&info)) { if (!isDqlSqlStatement(&info)) {
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); bool toVnode = false;
if (NULL == pDcl) { if (info.type == TSDB_SQL_CREATE_TABLE) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. SCreateTableSql* pCreateSql = info.pCreateTableInfo;
return terrno; if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
toVnode = true;
}
} }
pDcl->nodeType = info.type; if (toVnode) {
int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen); SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) { if (pInsertInfo == NULL) {
return terrno;
}
*pQuery = (SQueryNode*) pInsertInfo;
} else {
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pDcl == NULL) {
return terrno;
}
*pQuery = (SQueryNode*)pDcl; *pQuery = (SQueryNode*)pDcl;
pDcl->nodeType = info.type;
} }
} else { } else {
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));

View File

@ -714,10 +714,9 @@ TEST(testCase, show_user_Test) {
SSqlInfo info1 = doGenerateAST(sql1); SSqlInfo info1 = doGenerateAST(sql1);
ASSERT_EQ(info1.valid, true); ASSERT_EQ(info1.valid, true);
SDclStmtInfo output;
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL}; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len);
ASSERT_EQ(code, 0); ASSERT_NE(output, nullptr);
// convert the show command to be the select query // convert the show command to be the select query
// select name, privilege, create_time, account from information_schema.users; // select name, privilege, create_time, account from information_schema.users;
@ -735,10 +734,9 @@ TEST(testCase, create_user_Test) {
ASSERT_EQ(info1.valid, true); ASSERT_EQ(info1.valid, true);
ASSERT_EQ(isDclSqlStatement(&info1), true); ASSERT_EQ(isDclSqlStatement(&info1), true);
SDclStmtInfo output;
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"}; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len);
ASSERT_EQ(code, 0); ASSERT_NE(output, nullptr);
destroySqlInfo(&info1); destroySqlInfo(&info1);
} }

View File

@ -40,7 +40,7 @@ extern "C" {
#define QNODE_SESSIONWINDOW 12 #define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13 #define QNODE_STATEWINDOW 13
#define QNODE_FILL 14 #define QNODE_FILL 14
#define QNODE_INSERT 15 #define QNODE_MODIFY 15
typedef struct SQueryDistPlanNodeInfo { typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not

View File

@ -37,15 +37,19 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0; return 0;
} }
int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) { static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); *pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
if (NULL == *pQueryPlan || NULL == blocks) { if (NULL == *pQueryPlan || NULL == blocks) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
(*pQueryPlan)->info.type = QNODE_INSERT;
(*pQueryPlan)->info.type = QNODE_MODIFY;
taosArrayAddAll(blocks, pInsert->pDataBlocks); taosArrayAddAll(blocks, pInsert->pDataBlocks);
(*pQueryPlan)->pExtInfo = blocks; (*pQueryPlan)->pExtInfo = blocks;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -62,13 +66,14 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
case TSDB_SQL_SELECT: { case TSDB_SQL_SELECT: {
return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan); return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan);
} }
case TSDB_SQL_INSERT: case TSDB_SQL_INSERT:
return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan); case TSDB_SQL_CREATE_TABLE:
return createInsertPlan(pNode, pQueryPlan);
default: default:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS;
} }
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {

View File

@ -34,7 +34,7 @@ static const char* gOpName[] = {
#undef INCLUDE_AS_NAME #undef INCLUDE_AS_NAME
}; };
static void* vailidPointer(void* p) { static void* validPointer(void* p) {
if (NULL == p) { if (NULL == p) {
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
} }
@ -76,7 +76,7 @@ int32_t dsinkNameToDsinkType(const char* name) {
} }
static SDataSink* initDataSink(int32_t type, int32_t size) { static SDataSink* initDataSink(int32_t type, int32_t size) {
SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size)); SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
sink->info.type = type; sink->info.type = type;
sink->info.name = dsinkTypeToDsinkName(type); sink->info.name = dsinkTypeToDsinkName(type);
return sink; return sink;
@ -121,7 +121,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) {
} }
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size)); SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
node->info.type = type; node->info.type = type;
node->info.name = opTypeToOpName(type); node->info.name = opTypeToOpName(type);
if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) { if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
@ -184,7 +184,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
} }
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan))); SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
subplan->id = pCxt->nextId; subplan->id = pCxt->nextId;
++(pCxt->nextId.subplanId); ++(pCxt->nextId.subplanId);
subplan->type = type; subplan->type = type;
@ -192,15 +192,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
if (NULL != pCxt->pCurrentSubplan) { if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1; subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildern) { if (NULL == pCxt->pCurrentSubplan->pChildern) {
pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
} }
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan); taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
} }
SArray* currentLevel; SArray* currentLevel;
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(pCxt->pDag->pSubplans, &currentLevel); taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
} else { } else {
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
@ -272,7 +272,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN: case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode); node = createTableScanNode(pCxt, pPlanNode);
break; break;
case QNODE_INSERT: case QNODE_MODIFY:
// Insert is not an operator in a physical plan. // Insert is not an operator in a physical plan.
break; break;
default: default:
@ -306,7 +306,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
} }
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_INSERT == pRoot->info.type) { if (QNODE_MODIFY == pRoot->info.type) {
splitInsertSubplan(pCxt, pRoot); splitInsertSubplan(pCxt, pRoot);
} else { } else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
@ -321,12 +321,12 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
TRY(TSDB_MAX_TAG_CONDITIONS) { TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = { SPlanContext context = {
.pCatalog = pCatalog, .pCatalog = pCatalog,
.pDag = vailidPointer(calloc(1, sizeof(SQueryDag))), .pDag = validPointer(calloc(1, sizeof(SQueryDag))),
.pCurrentSubplan = NULL, .pCurrentSubplan = NULL,
.nextId = {0} // todo queryid .nextId = {0} // todo queryid
}; };
*pDag = context.pDag; *pDag = context.pDag;
context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode); createSubplanByLevel(&context, pQueryNode);
} CATCH(code) { } CATCH(code) {
CLEANUP_EXECUTE(); CLEANUP_EXECUTE();

View File

@ -267,7 +267,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
pOut->metaNum = 1; pOut->metaNum = 1;
if (pMetaMsg->dbFname[0]) { if (pMetaMsg->dbFname[0]) {
snprintf(pOut->tbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
} else { } else {
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
} }