more integration
This commit is contained in:
parent
ae426947d3
commit
e818e4aa5b
|
@ -442,7 +442,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
|
||||||
|
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE);
|
taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE);
|
||||||
|
|
||||||
if (taosWrite(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
|
if (taosWriteFile(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
close(fd);
|
close(fd);
|
||||||
remove(tfname);
|
remove(tfname);
|
||||||
|
@ -461,7 +461,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
|
||||||
tsdbEncodeFSStatus(&ptr, pStatus);
|
tsdbEncodeFSStatus(&ptr, pStatus);
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len);
|
taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len);
|
||||||
|
|
||||||
if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) {
|
if (taosWriteFile(fd, pBuf, fsheader.len) < fsheader.len) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
close(fd);
|
close(fd);
|
||||||
(void)remove(tfname);
|
(void)remove(tfname);
|
||||||
|
@ -677,7 +677,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int nread = (int)taosRead(fd, buffer, TSDB_FILE_HEAD_SIZE);
|
int nread = (int)taosReadFile(fd, buffer, TSDB_FILE_HEAD_SIZE);
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current,
|
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
|
@ -711,7 +711,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
nread = (int)taosRead(fd, buffer, fsheader.len);
|
nread = (int)taosReadFile(fd, buffer, fsheader.len);
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno));
|
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -1207,7 +1207,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((fArraySize = taosArrayGetSize(fArray)) <= 0) {
|
if ((fArraySize = taosArrayGetSize(fArray)) <= 0) {
|
||||||
taosArrayDestroy(&fArray);
|
taosArrayDestroy(fArray);
|
||||||
tsdbInfo("vgId:%d size of DFileSet from %s is %" PRIu32, REPO_ID(pRepo), dataDir, (uint32_t)fArraySize);
|
tsdbInfo("vgId:%d size of DFileSet from %s is %" PRIu32, REPO_ID(pRepo), dataDir, (uint32_t)fArraySize);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1258,7 +1258,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
|
||||||
// return error in case of removing uncomplete DFileSets
|
// return error in case of removing uncomplete DFileSets
|
||||||
// terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
|
// terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
|
||||||
tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
|
tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
|
||||||
taosArrayDestroy(&fArray);
|
taosArrayDestroy(fArray);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1271,7 +1271,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
|
||||||
// return error in case of removing uncomplete DFileSets
|
// return error in case of removing uncomplete DFileSets
|
||||||
// terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
|
// terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
|
||||||
tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
|
tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
|
||||||
taosArrayDestroy(&fArray);
|
taosArrayDestroy(fArray);
|
||||||
return -1;
|
return -1;
|
||||||
#if 0
|
#if 0
|
||||||
// next FSet
|
// next FSet
|
||||||
|
@ -1293,14 +1293,14 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
|
||||||
if (tsdbOpenDFile(pDFile1, O_RDONLY) < 0) {
|
if (tsdbOpenDFile(pDFile1, O_RDONLY) < 0) {
|
||||||
tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile1),
|
tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile1),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
taosArrayDestroy(&fArray);
|
taosArrayDestroy(fArray);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbLoadDFileHeader(pDFile1, &(pDFile1->info)) < 0) {
|
if (tsdbLoadDFileHeader(pDFile1, &(pDFile1->info)) < 0) {
|
||||||
tsdbError("vgId:%d failed to load DFile %s header since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile1),
|
tsdbError("vgId:%d failed to load DFile %s header since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile1),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
taosArrayDestroy(&fArray);
|
taosArrayDestroy(fArray);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1310,7 +1310,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
|
||||||
// Get real file size
|
// Get real file size
|
||||||
if (fstat(pDFile1->fd, &tfstat) < 0) {
|
if (fstat(pDFile1->fd, &tfstat) < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosArrayDestroy(&fArray);
|
taosArrayDestroy(fArray);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1346,7 +1346,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resource release
|
// Resource release
|
||||||
taosArrayDestroy(&fArray);
|
taosArrayDestroy(fArray);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,8 +12,8 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include "tsdbint.h"
|
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
#include "tsdbint.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
||||||
|
@ -45,7 +45,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
|
||||||
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
|
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
|
||||||
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
|
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
|
||||||
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
|
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
|
||||||
static int tsdbInsertNewTableAction(STsdbRepo *pRepo, STable* pTable);
|
static int tsdbInsertNewTableAction(STsdbRepo *pRepo, STable *pTable);
|
||||||
static int tsdbAddSchema(STable *pTable, STSchema *pSchema);
|
static int tsdbAddSchema(STable *pTable, STSchema *pSchema);
|
||||||
static void tsdbFreeTableSchema(STable *pTable);
|
static void tsdbFreeTableSchema(STable *pTable);
|
||||||
|
|
||||||
|
@ -73,9 +73,9 @@ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d table %s at tid %d uid %" PRIu64
|
tsdbInfo("vgId:%d table %s at tid %d uid %" PRIu64
|
||||||
" exists, replace it with new table, this can be not reasonable",
|
" exists, replace it with new table, this can be not reasonable",
|
||||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]), TABLE_TID(pMeta->tables[tid]),
|
REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]), TABLE_TID(pMeta->tables[tid]),
|
||||||
TABLE_UID(pMeta->tables[tid]));
|
TABLE_UID(pMeta->tables[tid]));
|
||||||
tsdbDropTable(pRepo, pMeta->tables[tid]->tableId);
|
tsdbDropTable(pRepo, pMeta->tables[tid]->tableId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -204,33 +204,33 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type) {
|
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type) {
|
||||||
// TODO: this function should be changed also
|
// TODO: this function should be changed also
|
||||||
|
|
||||||
STSchema *pSchema = tsdbGetTableTagSchema((STable*) pTable);
|
STSchema *pSchema = tsdbGetTableTagSchema((STable *)pTable);
|
||||||
STColumn *pCol = tdGetColOfID(pSchema, colId);
|
STColumn *pCol = tdGetColOfID(pSchema, colId);
|
||||||
if (pCol == NULL) {
|
if (pCol == NULL) {
|
||||||
return NULL; // No matched tag volumn
|
return NULL; // No matched tag volumn
|
||||||
}
|
}
|
||||||
|
|
||||||
char *val = NULL;
|
char *val = NULL;
|
||||||
if (pCol->type == TSDB_DATA_TYPE_JSON){
|
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||||
val = ((STable*)pTable)->tagVal;
|
val = ((STable *)pTable)->tagVal;
|
||||||
}else{
|
} else {
|
||||||
val = tdGetKVRowValOfCol(((STable*)pTable)->tagVal, colId);
|
val = tdGetKVRowValOfCol(((STable *)pTable)->tagVal, colId);
|
||||||
assert(type == pCol->type);
|
assert(type == pCol->type);
|
||||||
}
|
}
|
||||||
|
|
||||||
return val;
|
return val;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *tsdbGetTableName(void* pTable) {
|
char *tsdbGetTableName(void *pTable) {
|
||||||
// TODO: need to change as thread-safe
|
// TODO: need to change as thread-safe
|
||||||
|
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
return (char*) (((STable *)pTable)->name);
|
return (char *)(((STable *)pTable)->name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,15 +299,15 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static UNUSED_FUNC int32_t colIdCompar(const void* left, const void* right) {
|
static UNUSED_FUNC int32_t colIdCompar(const void *left, const void *right) {
|
||||||
int16_t colId = *(int16_t*) left;
|
int16_t colId = *(int16_t *)left;
|
||||||
STColumn* p2 = (STColumn*) right;
|
STColumn *p2 = (STColumn *)right;
|
||||||
|
|
||||||
if (colId == p2->colId) {
|
if (colId == p2->colId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (colId < p2->colId)? -1:1;
|
return (colId < p2->colId) ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
|
@ -317,9 +317,9 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
|
|
||||||
pMsg->uid = htobe64(pMsg->uid);
|
pMsg->uid = htobe64(pMsg->uid);
|
||||||
pMsg->tid = htonl(pMsg->tid);
|
pMsg->tid = htonl(pMsg->tid);
|
||||||
pMsg->tversion = htons(pMsg->tversion);
|
pMsg->tversion = htons(pMsg->tversion);
|
||||||
pMsg->colId = htons(pMsg->colId);
|
pMsg->colId = htons(pMsg->colId);
|
||||||
pMsg->bytes = htons(pMsg->bytes);
|
pMsg->bytes = htons(pMsg->bytes);
|
||||||
pMsg->tagValLen = htonl(pMsg->tagValLen);
|
pMsg->tagValLen = htonl(pMsg->tagValLen);
|
||||||
pMsg->numOfTags = htons(pMsg->numOfTags);
|
pMsg->numOfTags = htons(pMsg->numOfTags);
|
||||||
pMsg->schemaLen = htonl(pMsg->schemaLen);
|
pMsg->schemaLen = htonl(pMsg->schemaLen);
|
||||||
|
@ -361,7 +361,8 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
STSchemaBuilder schemaBuilder = {0};
|
STSchemaBuilder schemaBuilder = {0};
|
||||||
|
|
||||||
STColumn *pTCol = (STColumn *)pMsg->data;
|
STColumn *pTCol = (STColumn *)pMsg->data;
|
||||||
ASSERT(pMsg->schemaLen % sizeof(STColumn) == 0 && pTCol[0].colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
|
ASSERT(pMsg->schemaLen % sizeof(STColumn) == 0 &&
|
||||||
|
pTCol[0].colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
|
||||||
if (tdInitTSchemaBuilder(&schemaBuilder, pMsg->tversion) < 0) {
|
if (tdInitTSchemaBuilder(&schemaBuilder, pMsg->tversion) < 0) {
|
||||||
tsdbDebug("vgId:%d failed to update tag schema of table %s tid %d uid %" PRIu64 " since out of memory",
|
tsdbDebug("vgId:%d failed to update tag schema of table %s tid %d uid %" PRIu64 " since out of memory",
|
||||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable));
|
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable));
|
||||||
|
@ -385,7 +386,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Change in memory
|
// Change in memory
|
||||||
if (pNewSchema != NULL) { // change super table tag schema
|
if (pNewSchema != NULL) { // change super table tag schema
|
||||||
TSDB_WLOCK_TABLE(pTable->pSuper);
|
TSDB_WLOCK_TABLE(pTable->pSuper);
|
||||||
STSchema *pOldSchema = pTable->pSuper->tagSchema;
|
STSchema *pOldSchema = pTable->pSuper->tagSchema;
|
||||||
pTable->pSuper->tagSchema = pNewSchema;
|
pTable->pSuper->tagSchema = pNewSchema;
|
||||||
|
@ -393,8 +394,8 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
TSDB_WUNLOCK_TABLE(pTable->pSuper);
|
TSDB_WUNLOCK_TABLE(pTable->pSuper);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)))
|
bool isChangeIndexCol =
|
||||||
|| pMsg->type == TSDB_DATA_TYPE_JSON;
|
(pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0))) || pMsg->type == TSDB_DATA_TYPE_JSON;
|
||||||
// STColumn *pCol = bsearch(&(pMsg->colId), pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
|
// STColumn *pCol = bsearch(&(pMsg->colId), pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
|
||||||
// ASSERT(pCol != NULL);
|
// ASSERT(pCol != NULL);
|
||||||
|
|
||||||
|
@ -403,10 +404,10 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
tsdbRemoveTableFromIndex(pMeta, pTable);
|
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||||
}
|
}
|
||||||
TSDB_WLOCK_TABLE(pTable);
|
TSDB_WLOCK_TABLE(pTable);
|
||||||
if (pMsg->type == TSDB_DATA_TYPE_JSON){
|
if (pMsg->type == TSDB_DATA_TYPE_JSON) {
|
||||||
kvRowFree(pTable->tagVal);
|
kvRowFree(pTable->tagVal);
|
||||||
pTable->tagVal = tdKVRowDup(POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
|
pTable->tagVal = tdKVRowDup(POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
|
||||||
}else{
|
} else {
|
||||||
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
|
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
|
||||||
}
|
}
|
||||||
TSDB_WUNLOCK_TABLE(pTable);
|
TSDB_WUNLOCK_TABLE(pTable);
|
||||||
|
@ -416,9 +417,9 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update on file
|
// Update on file
|
||||||
int tlen1 = (pNewSchema) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable->pSuper) : 0;
|
int tlen1 = (pNewSchema) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable->pSuper) : 0;
|
||||||
int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable);
|
int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable);
|
||||||
void *buf = tsdbAllocBytes(pRepo, tlen1+tlen2);
|
void *buf = tsdbAllocBytes(pRepo, tlen1 + tlen2);
|
||||||
ASSERT(buf != NULL);
|
ASSERT(buf != NULL);
|
||||||
if (pNewSchema) {
|
if (pNewSchema) {
|
||||||
void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable->pSuper);
|
void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable->pSuper);
|
||||||
|
@ -433,7 +434,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------ INTERNAL FUNCTIONS ------------------
|
// ------------------ INTERNAL FUNCTIONS ------------------
|
||||||
static int tsdbInsertNewTableAction(STsdbRepo *pRepo, STable* pTable) {
|
static int tsdbInsertNewTableAction(STsdbRepo *pRepo, STable *pTable) {
|
||||||
int tlen = 0;
|
int tlen = 0;
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
|
|
||||||
|
@ -475,7 +476,8 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->uidMap = taosHashInit((size_t)(TSDB_INIT_NTABLES * 1.1), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
pMeta->uidMap =
|
||||||
|
taosHashInit((size_t)(TSDB_INIT_NTABLES * 1.1), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
if (pMeta->uidMap == NULL) {
|
if (pMeta->uidMap == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -596,7 +598,8 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
|
||||||
void tsdbRefTable(STable *pTable) {
|
void tsdbRefTable(STable *pTable) {
|
||||||
int32_t ref = T_REF_INC(pTable);
|
int32_t ref = T_REF_INC(pTable);
|
||||||
UNUSED(ref);
|
UNUSED(ref);
|
||||||
tsdbDebug("ref table %s uid %" PRIu64 " tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
tsdbDebug("ref table %s uid %" PRIu64 " tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable),
|
||||||
|
TABLE_TID(pTable), ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbUnRefTable(STable *pTable) {
|
void tsdbUnRefTable(STable *pTable) {
|
||||||
|
@ -614,7 +617,7 @@ void tsdbUnRefTable(STable *pTable) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbFreeLastColumns(STable* pTable) {
|
void tsdbFreeLastColumns(STable *pTable) {
|
||||||
if (pTable->lastCols == NULL) {
|
if (pTable->lastCols == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -635,7 +638,7 @@ void tsdbFreeLastColumns(STable* pTable) {
|
||||||
pTable->hasRestoreLastColumn = false;
|
pTable->hasRestoreLastColumn = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
|
int16_t tsdbGetLastColumnsIndexByColId(STable *pTable, int16_t colId) {
|
||||||
if (pTable->lastCols == NULL) {
|
if (pTable->lastCols == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -649,7 +652,7 @@ int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
|
int tsdbInitColIdCacheWithSchema(STable *pTable, STSchema *pSchema) {
|
||||||
TSDB_WLOCK_TABLE(pTable);
|
TSDB_WLOCK_TABLE(pTable);
|
||||||
if (pTable->lastCols == NULL) {
|
if (pTable->lastCols == NULL) {
|
||||||
int16_t numOfColumn = pSchema->numOfCols;
|
int16_t numOfColumn = pSchema->numOfCols;
|
||||||
|
@ -677,19 +680,18 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema* tsdbGetTableLatestSchema(STable *pTable) {
|
STSchema *tsdbGetTableLatestSchema(STable *pTable) { return tsdbGetTableSchemaByVersion(pTable, -1, -1); }
|
||||||
return tsdbGetTableSchemaByVersion(pTable, -1, -1);
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
||||||
if (pTable->lastColSVersion == schemaVersion(pNewSchema)) {
|
if (pTable->lastColSVersion == schemaVersion(pNewSchema)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("tsdbUpdateLastColSchema:%s,%d->%d", pTable->name->data, pTable->lastColSVersion, schemaVersion(pNewSchema));
|
tsdbDebug("tsdbUpdateLastColSchema:%s,%d->%d", pTable->name->data, pTable->lastColSVersion,
|
||||||
|
schemaVersion(pNewSchema));
|
||||||
int16_t numOfCols = pNewSchema->numOfCols;
|
|
||||||
SDataCol *lastCols = (SDataCol*)malloc(numOfCols * sizeof(SDataCol));
|
int16_t numOfCols = pNewSchema->numOfCols;
|
||||||
|
SDataCol *lastCols = (SDataCol *)malloc(numOfCols * sizeof(SDataCol));
|
||||||
if (lastCols == NULL) {
|
if (lastCols == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -698,12 +700,12 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
||||||
|
|
||||||
for (int16_t i = 0; i < numOfCols; ++i) {
|
for (int16_t i = 0; i < numOfCols; ++i) {
|
||||||
STColumn *pCol = schemaColAt(pNewSchema, i);
|
STColumn *pCol = schemaColAt(pNewSchema, i);
|
||||||
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
|
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
|
||||||
|
|
||||||
SDataCol* pDataCol = &(lastCols[i]);
|
SDataCol *pDataCol = &(lastCols[i]);
|
||||||
if (idx != -1) {
|
if (idx != -1) {
|
||||||
// move col data to new last column array
|
// move col data to new last column array
|
||||||
SDataCol* pOldDataCol = &(pTable->lastCols[idx]);
|
SDataCol *pOldDataCol = &(pTable->lastCols[idx]);
|
||||||
memcpy(pDataCol, pOldDataCol, sizeof(SDataCol));
|
memcpy(pDataCol, pOldDataCol, sizeof(SDataCol));
|
||||||
} else {
|
} else {
|
||||||
// init new colid data
|
// init new colid data
|
||||||
|
@ -714,7 +716,7 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCol *oldLastCols = pTable->lastCols;
|
SDataCol *oldLastCols = pTable->lastCols;
|
||||||
int16_t oldLastColNum = pTable->maxColNum;
|
int16_t oldLastColNum = pTable->maxColNum;
|
||||||
|
|
||||||
pTable->lastColSVersion = schemaVersion(pNewSchema);
|
pTable->lastColSVersion = schemaVersion(pNewSchema);
|
||||||
pTable->lastCols = lastCols;
|
pTable->lastCols = lastCols;
|
||||||
|
@ -727,7 +729,7 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
||||||
|
|
||||||
// free old schema last column datas
|
// free old schema last column datas
|
||||||
for (int16_t i = 0; i < oldLastColNum; ++i) {
|
for (int16_t i = 0; i < oldLastColNum; ++i) {
|
||||||
SDataCol* pDataCol = &(oldLastCols[i]);
|
SDataCol *pDataCol = &(oldLastCols[i]);
|
||||||
if (pDataCol->bytes == 0) {
|
if (pDataCol->bytes == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -761,8 +763,8 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
|
||||||
|
|
||||||
if (insertAct) {
|
if (insertAct) {
|
||||||
if (tsdbInsertNewTableAction(pRepo, pCTable) != 0) {
|
if (tsdbInsertNewTableAction(pRepo, pCTable) != 0) {
|
||||||
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " tsdbInsertNewTableAction fail", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " tsdbInsertNewTableAction fail", REPO_ID(pRepo),
|
||||||
TABLE_TID(pTable), TABLE_UID(pTable));
|
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -807,7 +809,7 @@ static char *getTagIndexKey(const void *pData) {
|
||||||
void * res = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
|
void * res = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
|
||||||
if (res == NULL) {
|
if (res == NULL) {
|
||||||
// treat the column as NULL if we cannot find it
|
// treat the column as NULL if we cannot find it
|
||||||
res = (char*)getNullValue(pCol->type);
|
res = (char *)getNullValue(pCol->type);
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -861,7 +863,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
|
||||||
}
|
}
|
||||||
pTable->tagVal = NULL;
|
pTable->tagVal = NULL;
|
||||||
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
|
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
|
||||||
if(pCol->type == TSDB_DATA_TYPE_JSON){
|
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||||
assert(pTable->tagSchema->numOfCols == 1);
|
assert(pTable->tagSchema->numOfCols == 1);
|
||||||
pTable->jsonKeyMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pTable->jsonKeyMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
if (pTable->jsonKeyMap == NULL) {
|
if (pTable->jsonKeyMap == NULL) {
|
||||||
|
@ -869,8 +871,8 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
|
||||||
tsdbFreeTable(pTable);
|
tsdbFreeTable(pTable);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
taosHashSetFreeFp(pTable->jsonKeyMap, taosArrayDestroyForHash);
|
// taosHashSetFreeFp(pTable->jsonKeyMap, taosArrayDestroyForHash);
|
||||||
}else{
|
} else {
|
||||||
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
|
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
|
||||||
SL_ALLOW_DUP_KEY, getTagIndexKey);
|
SL_ALLOW_DUP_KEY, getTagIndexKey);
|
||||||
if (pTable->pIndex == NULL) {
|
if (pTable->pIndex == NULL) {
|
||||||
|
@ -947,7 +949,7 @@ static void tsdbFreeTable(STable *pTable) {
|
||||||
|
|
||||||
tSkipListDestroy(pTable->pIndex);
|
tSkipListDestroy(pTable->pIndex);
|
||||||
taosHashCleanup(pTable->jsonKeyMap);
|
taosHashCleanup(pTable->jsonKeyMap);
|
||||||
taosTZfree(pTable->lastRow);
|
taosTZfree(pTable->lastRow);
|
||||||
tfree(pTable->sql);
|
tfree(pTable->sql);
|
||||||
|
|
||||||
tsdbFreeLastColumns(pTable);
|
tsdbFreeLastColumns(pTable);
|
||||||
|
@ -1003,8 +1005,9 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
|
||||||
|
|
||||||
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
|
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
|
||||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
|
pTable->cqhandle =
|
||||||
tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 1);
|
(*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data,
|
||||||
|
pTable->sql, tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
|
@ -1067,26 +1070,27 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
||||||
pMeta->maxRowBytes = maxRowBytes;
|
pMeta->maxRowBytes = maxRowBytes;
|
||||||
|
|
||||||
if (lock) tsdbUnlockRepoMeta(pRepo);
|
if (lock) tsdbUnlockRepoMeta(pRepo);
|
||||||
tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable));
|
tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
|
TABLE_UID(pTable));
|
||||||
tsdbUnRefTable(pTable);
|
tsdbUnRefTable(pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tsdbGetJsonTagValue(STable* pTable, char* key, int32_t keyLen, int16_t* retColId){
|
void *tsdbGetJsonTagValue(STable *pTable, char *key, int32_t keyLen, int16_t *retColId) {
|
||||||
assert(TABLE_TYPE(pTable) == TSDB_CHILD_TABLE);
|
assert(TABLE_TYPE(pTable) == TSDB_CHILD_TABLE);
|
||||||
STable* superTable= pTable->pSuper;
|
STable * superTable = pTable->pSuper;
|
||||||
SArray** data = (SArray**)taosHashGet(superTable->jsonKeyMap, key, keyLen);
|
SArray **data = (SArray **)taosHashGet(superTable->jsonKeyMap, key, keyLen);
|
||||||
if(data == NULL) return NULL;
|
if (data == NULL) return NULL;
|
||||||
JsonMapValue jmvalue = {pTable, 0};
|
JsonMapValue jmvalue = {pTable, 0};
|
||||||
JsonMapValue* p = taosArraySearch(*data, &jmvalue, tsdbCompareJsonMapValue, TD_EQ);
|
JsonMapValue *p = taosArraySearch(*data, &jmvalue, tsdbCompareJsonMapValue, TD_EQ);
|
||||||
if (p == NULL) return NULL;
|
if (p == NULL) return NULL;
|
||||||
int16_t colId = p->colId + 1;
|
int16_t colId = p->colId + 1;
|
||||||
if(retColId) *retColId = p->colId;
|
if (retColId) *retColId = p->colId;
|
||||||
return tdGetKVRowValOfCol(pTable->tagVal, colId);
|
return tdGetKVRowValOfCol(pTable->tagVal, colId);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCompareJsonMapValue(const void* a, const void* b) {
|
int tsdbCompareJsonMapValue(const void *a, const void *b) {
|
||||||
const JsonMapValue* x = (const JsonMapValue*)a;
|
const JsonMapValue *x = (const JsonMapValue *)a;
|
||||||
const JsonMapValue* y = (const JsonMapValue*)b;
|
const JsonMapValue *y = (const JsonMapValue *)b;
|
||||||
if (x->table > y->table) return 1;
|
if (x->table > y->table) return 1;
|
||||||
if (x->table < y->table) return -1;
|
if (x->table < y->table) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1100,15 +1104,15 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
|
||||||
pTable->pSuper = pSTable;
|
pTable->pSuper = pSTable;
|
||||||
if (refSuper) T_REF_INC(pSTable);
|
if (refSuper) T_REF_INC(pSTable);
|
||||||
|
|
||||||
if(pSTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
|
if (pSTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
ASSERT(pSTable->tagSchema->numOfCols == 1);
|
ASSERT(pSTable->tagSchema->numOfCols == 1);
|
||||||
int16_t nCols = kvRowNCols(pTable->tagVal);
|
int16_t nCols = kvRowNCols(pTable->tagVal);
|
||||||
ASSERT(nCols%2 == 1);
|
ASSERT(nCols % 2 == 1);
|
||||||
// check first
|
// check first
|
||||||
for (int j = 0; j < nCols; ++j) {
|
for (int j = 0; j < nCols; ++j) {
|
||||||
if (j != 0 && j % 2 == 0) continue; // jump value
|
if (j != 0 && j % 2 == 0) continue; // jump value
|
||||||
SColIdx *pColIdx = kvRowColIdxAt(pTable->tagVal, j);
|
SColIdx *pColIdx = kvRowColIdxAt(pTable->tagVal, j);
|
||||||
void *val = (kvRowColVal(pTable->tagVal, pColIdx));
|
void * val = (kvRowColVal(pTable->tagVal, pColIdx));
|
||||||
if (j == 0) { // json value is the first
|
if (j == 0) { // json value is the first
|
||||||
int8_t jsonPlaceHolder = *(int8_t *)val;
|
int8_t jsonPlaceHolder = *(int8_t *)val;
|
||||||
ASSERT(jsonPlaceHolder == TSDB_DATA_JSON_PLACEHOLDER);
|
ASSERT(jsonPlaceHolder == TSDB_DATA_JSON_PLACEHOLDER);
|
||||||
|
@ -1122,7 +1126,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
|
||||||
// then insert
|
// then insert
|
||||||
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
|
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
|
||||||
jsonKeyMd5(varDataVal(val), varDataLen(val), keyMd5);
|
jsonKeyMd5(varDataVal(val), varDataLen(val), keyMd5);
|
||||||
SArray *tablistNew = NULL;
|
SArray * tablistNew = NULL;
|
||||||
SArray **tablist = (SArray **)taosHashGet(pSTable->jsonKeyMap, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
|
SArray **tablist = (SArray **)taosHashGet(pSTable->jsonKeyMap, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
|
||||||
if (tablist == NULL) {
|
if (tablist == NULL) {
|
||||||
tablistNew = taosArrayInit(8, sizeof(JsonMapValue));
|
tablistNew = taosArrayInit(8, sizeof(JsonMapValue));
|
||||||
|
@ -1141,19 +1145,19 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
|
||||||
}
|
}
|
||||||
|
|
||||||
JsonMapValue jmvalue = {pTable, pColIdx->colId};
|
JsonMapValue jmvalue = {pTable, pColIdx->colId};
|
||||||
void* p = taosArraySearch(tablistNew, &jmvalue, tsdbCompareJsonMapValue, TD_EQ);
|
void * p = taosArraySearch(tablistNew, &jmvalue, tsdbCompareJsonMapValue, TD_EQ);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
p = taosArraySearch(tablistNew, &jmvalue, tsdbCompareJsonMapValue, TD_GE);
|
p = taosArraySearch(tablistNew, &jmvalue, tsdbCompareJsonMapValue, TD_GE);
|
||||||
if(p == NULL){
|
if (p == NULL) {
|
||||||
taosArrayPush(tablistNew, &jmvalue);
|
taosArrayPush(tablistNew, &jmvalue);
|
||||||
}else{
|
} else {
|
||||||
taosArrayInsert(tablistNew, TARRAY_ELEM_IDX(tablistNew, p), &jmvalue);
|
taosArrayInsert(tablistNew, TARRAY_ELEM_IDX(tablistNew, p), &jmvalue);
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
tsdbError("insert dumplicate");
|
tsdbError("insert dumplicate");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
tSkipListPut(pSTable->pIndex, (void *)pTable);
|
tSkipListPut(pSTable->pIndex, (void *)pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1166,41 +1170,41 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
STable *pSTable = pTable->pSuper;
|
STable *pSTable = pTable->pSuper;
|
||||||
ASSERT(pSTable != NULL);
|
ASSERT(pSTable != NULL);
|
||||||
|
|
||||||
if(pSTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
|
if (pSTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
ASSERT(pSTable->tagSchema->numOfCols == 1);
|
ASSERT(pSTable->tagSchema->numOfCols == 1);
|
||||||
int16_t nCols = kvRowNCols(pTable->tagVal);
|
int16_t nCols = kvRowNCols(pTable->tagVal);
|
||||||
ASSERT(nCols%2 == 1);
|
ASSERT(nCols % 2 == 1);
|
||||||
for (int j = 0; j < nCols; ++j) {
|
for (int j = 0; j < nCols; ++j) {
|
||||||
if (j != 0 && j%2 == 0) continue; // jump value
|
if (j != 0 && j % 2 == 0) continue; // jump value
|
||||||
SColIdx * pColIdx = kvRowColIdxAt(pTable->tagVal, j);
|
SColIdx *pColIdx = kvRowColIdxAt(pTable->tagVal, j);
|
||||||
void* val = (kvRowColVal(pTable->tagVal, pColIdx));
|
void * val = (kvRowColVal(pTable->tagVal, pColIdx));
|
||||||
if (j == 0){ // json value is the first
|
if (j == 0) { // json value is the first
|
||||||
int8_t jsonPlaceHolder = *(int8_t*)val;
|
int8_t jsonPlaceHolder = *(int8_t *)val;
|
||||||
ASSERT(jsonPlaceHolder == TSDB_DATA_JSON_PLACEHOLDER);
|
ASSERT(jsonPlaceHolder == TSDB_DATA_JSON_PLACEHOLDER);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (j == 1){
|
if (j == 1) {
|
||||||
uint32_t jsonNULL = *(uint32_t*)(varDataVal(val));
|
uint32_t jsonNULL = *(uint32_t *)(varDataVal(val));
|
||||||
ASSERT(jsonNULL == TSDB_DATA_JSON_NULL);
|
ASSERT(jsonNULL == TSDB_DATA_JSON_NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
|
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
|
||||||
jsonKeyMd5(varDataVal(val), varDataLen(val), keyMd5);
|
jsonKeyMd5(varDataVal(val), varDataLen(val), keyMd5);
|
||||||
SArray** tablist = (SArray **)taosHashGet(pSTable->jsonKeyMap, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
|
SArray **tablist = (SArray **)taosHashGet(pSTable->jsonKeyMap, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
|
||||||
if(tablist == NULL) {
|
if (tablist == NULL) {
|
||||||
tsdbError("json tag no key error,%d", j);
|
tsdbError("json tag no key error,%d", j);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
JsonMapValue jmvalue = {pTable, pColIdx->colId};
|
JsonMapValue jmvalue = {pTable, pColIdx->colId};
|
||||||
void* p = taosArraySearch(*tablist, &jmvalue, tsdbCompareJsonMapValue, TD_EQ);
|
void * p = taosArraySearch(*tablist, &jmvalue, tsdbCompareJsonMapValue, TD_EQ);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
tsdbError("json tag no tableid error,%d", j);
|
tsdbError("json tag no tableid error,%d", j);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
taosArrayRemove(*tablist, TARRAY_ELEM_IDX(*tablist, p));
|
taosArrayRemove(*tablist, TARRAY_ELEM_IDX(*tablist, p));
|
||||||
}
|
}
|
||||||
}else {
|
} else {
|
||||||
char * key = getTagIndexKey(pTable);
|
char * key = getTagIndexKey(pTable);
|
||||||
SArray *res = tSkipListGet(pSTable->pIndex, key);
|
SArray *res = tSkipListGet(pSTable->pIndex, key);
|
||||||
|
|
||||||
|
@ -1216,7 +1220,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(&res);
|
taosArrayDestroy(res);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1401,8 +1405,8 @@ static int tsdbEncodeTable(void **buf, STable *pTable) {
|
||||||
tlen += taosEncodeFixedU64(buf, TABLE_SUID(pTable));
|
tlen += taosEncodeFixedU64(buf, TABLE_SUID(pTable));
|
||||||
tlen += tdEncodeKVRow(buf, pTable->tagVal);
|
tlen += tdEncodeKVRow(buf, pTable->tagVal);
|
||||||
} else {
|
} else {
|
||||||
uint32_t arraySize = (uint32_t)taosArrayGetSize(pTable->schema);
|
uint32_t arraySize = (uint32_t)taosArrayGetSize(pTable->schema);
|
||||||
if(arraySize > UINT8_MAX) {
|
if (arraySize > UINT8_MAX) {
|
||||||
tlen += taosEncodeFixedU8(buf, 0);
|
tlen += taosEncodeFixedU8(buf, 0);
|
||||||
tlen += taosEncodeFixedU32(buf, arraySize);
|
tlen += taosEncodeFixedU32(buf, arraySize);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1443,7 +1447,7 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
|
||||||
} else {
|
} else {
|
||||||
uint32_t nSchemas = 0;
|
uint32_t nSchemas = 0;
|
||||||
buf = taosDecodeFixedU8(buf, (uint8_t *)&nSchemas);
|
buf = taosDecodeFixedU8(buf, (uint8_t *)&nSchemas);
|
||||||
if(nSchemas == 0) {
|
if (nSchemas == 0) {
|
||||||
buf = taosDecodeFixedU32(buf, &nSchemas);
|
buf = taosDecodeFixedU32(buf, &nSchemas);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < nSchemas; i++) {
|
for (int i = 0; i < nSchemas; i++) {
|
||||||
|
@ -1455,7 +1459,7 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
|
||||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
buf = tdDecodeSchema(buf, &(pTable->tagSchema));
|
buf = tdDecodeSchema(buf, &(pTable->tagSchema));
|
||||||
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
|
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
|
||||||
if(pCol->type == TSDB_DATA_TYPE_JSON){
|
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||||
assert(pTable->tagSchema->numOfCols == 1);
|
assert(pTable->tagSchema->numOfCols == 1);
|
||||||
pTable->jsonKeyMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pTable->jsonKeyMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
if (pTable->jsonKeyMap == NULL) {
|
if (pTable->jsonKeyMap == NULL) {
|
||||||
|
@ -1463,10 +1467,10 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
|
||||||
tsdbFreeTable(pTable);
|
tsdbFreeTable(pTable);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
taosHashSetFreeFp(pTable->jsonKeyMap, taosArrayDestroyForHash);
|
// taosHashSetFreeFp(pTable->jsonKeyMap, taosArrayDestroyForHash);
|
||||||
}else{
|
} else {
|
||||||
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
|
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
|
||||||
SL_ALLOW_DUP_KEY, getTagIndexKey);
|
SL_ALLOW_DUP_KEY, getTagIndexKey);
|
||||||
if (pTable->pIndex == NULL) {
|
if (pTable->pIndex == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbFreeTable(pTable);
|
tsdbFreeTable(pTable);
|
||||||
|
@ -1487,11 +1491,11 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* getJsonTagTableList(STable *pTable){
|
static SArray *getJsonTagTableList(STable *pTable) {
|
||||||
uint32_t key = TSDB_DATA_JSON_NULL;
|
uint32_t key = TSDB_DATA_JSON_NULL;
|
||||||
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
|
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
|
||||||
jsonKeyMd5(&key, INT_BYTES, keyMd5);
|
jsonKeyMd5(&key, INT_BYTES, keyMd5);
|
||||||
SArray** tablist = (SArray**)taosHashGet(pTable->jsonKeyMap, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
|
SArray **tablist = (SArray **)taosHashGet(pTable->jsonKeyMap, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
|
||||||
|
|
||||||
return *tablist;
|
return *tablist;
|
||||||
}
|
}
|
||||||
|
@ -1503,10 +1507,10 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
|
||||||
} else {
|
} else {
|
||||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
size_t tableSize = 0;
|
size_t tableSize = 0;
|
||||||
if(pTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
|
if (pTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
SArray* tablist = getJsonTagTableList(pTable);
|
SArray *tablist = getJsonTagTableList(pTable);
|
||||||
tableSize = taosArrayGetSize(tablist);
|
tableSize = taosArrayGetSize(tablist);
|
||||||
}else{
|
} else {
|
||||||
tableSize = SL_SIZE(pTable->pIndex);
|
tableSize = SL_SIZE(pTable->pIndex);
|
||||||
}
|
}
|
||||||
tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (tableSize + 1));
|
tlen = (int)((sizeof(SListNode) + sizeof(SActObj)) * (tableSize + 1));
|
||||||
|
@ -1524,7 +1528,7 @@ static void *tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable
|
||||||
SActCont * pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(*pAct));
|
SActCont * pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(*pAct));
|
||||||
void * pBuf = (void *)pCont;
|
void * pBuf = (void *)pCont;
|
||||||
|
|
||||||
pNode->prev = pNode->next = NULL;
|
TD_DLIST_NODE_PREV(pNode) = TD_DLIST_NODE_NEXT(pNode) = NULL;
|
||||||
pAct->act = act;
|
pAct->act = act;
|
||||||
pAct->uid = TABLE_UID(pTable);
|
pAct->uid = TABLE_UID(pTable);
|
||||||
|
|
||||||
|
@ -1549,14 +1553,14 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) {
|
||||||
|
|
||||||
void *pBuf = buf;
|
void *pBuf = buf;
|
||||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
if(pTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
|
if (pTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
SArray* tablist = getJsonTagTableList(pTable);
|
SArray *tablist = getJsonTagTableList(pTable);
|
||||||
for (int i = 0; i < taosArrayGetSize(tablist); ++i) {
|
for (int i = 0; i < taosArrayGetSize(tablist); ++i) {
|
||||||
JsonMapValue* p = taosArrayGet(tablist, i);
|
JsonMapValue *p = taosArrayGet(tablist, i);
|
||||||
ASSERT(TABLE_TYPE((STable *)(p->table)) == TSDB_CHILD_TABLE);
|
ASSERT(TABLE_TYPE((STable *)(p->table)) == TSDB_CHILD_TABLE);
|
||||||
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, p->table);
|
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, p->table);
|
||||||
}
|
}
|
||||||
}else {
|
} else {
|
||||||
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
@ -1582,13 +1586,13 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) {
|
||||||
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
|
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
|
||||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
tsdbWLockRepoMeta(pRepo);
|
tsdbWLockRepoMeta(pRepo);
|
||||||
if(pTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
|
if (pTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
SArray* tablist = getJsonTagTableList(pTable);
|
SArray *tablist = getJsonTagTableList(pTable);
|
||||||
for (int i = 0; i < taosArrayGetSize(tablist); ++i) {
|
for (int i = 0; i < taosArrayGetSize(tablist); ++i) {
|
||||||
JsonMapValue* p = taosArrayGet(tablist, i);
|
JsonMapValue *p = taosArrayGet(tablist, i);
|
||||||
tsdbRemoveTableFromMeta(pRepo, p->table, false, false);
|
tsdbRemoveTableFromMeta(pRepo, p->table, false, false);
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
@ -1611,6 +1615,7 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) {
|
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) {
|
||||||
|
#if 0
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
ASSERT(tid >= pMeta->maxTables);
|
ASSERT(tid >= pMeta->maxTables);
|
||||||
|
|
||||||
|
@ -1630,6 +1635,7 @@ static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) {
|
||||||
tfree(tTables);
|
tfree(tTables);
|
||||||
tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables);
|
tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables);
|
||||||
|
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1681,7 +1687,6 @@ static void tsdbFreeTableSchema(STable *pTable) {
|
||||||
tdFreeSchema(pSchema);
|
tdFreeSchema(pSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(&pTable->schema);
|
taosArrayDestroy(pTable->schema);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,8 +23,8 @@
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tlosertree.h"
|
#include "tlosertree.h"
|
||||||
#include "tsdbint.h"
|
#include "tsdbint.h"
|
||||||
#include "texpr.h"
|
// #include "texpr.h"
|
||||||
#include "qFilter.h"
|
// #include "qFilter.h"
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
|
|
||||||
#define EXTRA_BYTES 2
|
#define EXTRA_BYTES 2
|
||||||
|
|
Loading…
Reference in New Issue