Merge remote-tracking branch 'origin/3.0' into feature/sync2-merge
This commit is contained in:
commit
fbf354a78c
|
@ -340,7 +340,7 @@ typedef struct tDataTypeDescriptor {
|
|||
} tDataTypeDescriptor;
|
||||
|
||||
extern tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX];
|
||||
bool isValidDataType(int32_t type);
|
||||
bool isValidDataType(int32_t type);
|
||||
|
||||
void assignVal(char *val, const char *src, int32_t len, int32_t type);
|
||||
void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type);
|
||||
|
|
|
@ -73,8 +73,8 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConfDestroyImp
|
|||
if (conf == NULL) {
|
||||
jniDebug("jobj:%p, tmq config is already destroyed", jobj);
|
||||
} else {
|
||||
tmq_conf_destroy(conf);
|
||||
jniDebug("jobj:%p, config:%p, tmq successfully destroy config", jobj, conf);
|
||||
tmq_conf_destroy(conf);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -206,6 +206,7 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
|
|||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
if (tmq == NULL) {
|
||||
jniError("jobj:%p, tmq is closed", jobj);
|
||||
return;
|
||||
}
|
||||
TAOS_RES *res = (TAOS_RES *)jres;
|
||||
consumer = (*env)->NewGlobalRef(env, consumer);
|
||||
|
@ -252,6 +253,7 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicNam
|
|||
TAOS_RES *res = (TAOS_RES *)jres;
|
||||
if (res == NULL) {
|
||||
jniDebug("jobj:%p, invalid res handle", jobj);
|
||||
return NULL;
|
||||
}
|
||||
return (*env)->NewStringUTF(env, tmq_get_topic_name(res));
|
||||
}
|
||||
|
@ -259,6 +261,7 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetDbName(J
|
|||
TAOS_RES *res = (TAOS_RES *)jres;
|
||||
if (res == NULL) {
|
||||
jniDebug("jobj:%p, invalid res handle", jobj);
|
||||
return NULL;
|
||||
}
|
||||
return (*env)->NewStringUTF(env, tmq_get_db_name(res));
|
||||
}
|
||||
|
@ -266,6 +269,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
|
|||
TAOS_RES *res = (TAOS_RES *)jres;
|
||||
if (res == NULL) {
|
||||
jniDebug("jobj:%p, invalid res handle", jobj);
|
||||
return -1;
|
||||
}
|
||||
return tmq_get_vgroup_id(res);
|
||||
}
|
||||
|
@ -275,6 +279,7 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
|
|||
TAOS_RES *res = (TAOS_RES *)jres;
|
||||
if (res == NULL) {
|
||||
jniDebug("jobj:%p, invalid res handle", jobj);
|
||||
return NULL;
|
||||
}
|
||||
return (*env)->NewStringUTF(env, tmq_get_table_name(res));
|
||||
}
|
||||
|
|
|
@ -120,7 +120,11 @@ typedef struct SMetaFltParam {
|
|||
|
||||
} SMetaFltParam;
|
||||
|
||||
// TODO, refactor later
|
||||
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *results);
|
||||
int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *parm, SArray *pUids);
|
||||
int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids);
|
||||
int32_t metaFilterTtl(SMeta *pMeta, SMetaFltParam *param, SArray *pUids);
|
||||
|
||||
#if 1 // refact APIs below (TODO)
|
||||
typedef SVCreateTbReq STbCfg;
|
||||
|
|
|
@ -86,8 +86,12 @@ struct SMeta {
|
|||
TTB* pSuidIdx;
|
||||
// ivt idx and idx
|
||||
void* pTagIvtIdx;
|
||||
TTB* pTagIdx;
|
||||
TTB* pTtlIdx;
|
||||
|
||||
TTB* pTagIdx;
|
||||
TTB* pTtlIdx;
|
||||
|
||||
TTB* pCtimeIdx; // table created time idx
|
||||
TTB* pNcolIdx; // ncol of table idx, normal table only
|
||||
|
||||
TTB* pSmaIdx;
|
||||
|
||||
|
@ -142,6 +146,16 @@ typedef struct {
|
|||
int64_t smaUid;
|
||||
} SSmaIdxKey;
|
||||
|
||||
typedef struct {
|
||||
int64_t ctime;
|
||||
tb_uid_t uid;
|
||||
} SCtimeIdxKey;
|
||||
|
||||
typedef struct {
|
||||
int64_t ncol;
|
||||
tb_uid_t uid;
|
||||
} SNcolIdxKey;
|
||||
|
||||
// metaTable ==================
|
||||
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
|
||||
STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey);
|
||||
|
|
|
@ -24,6 +24,9 @@ static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
|||
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
|
||||
static int ctimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
|
||||
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
|
||||
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
|
||||
|
||||
|
@ -139,6 +142,20 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// idx table create time
|
||||
ret = tdbTbOpen("ctime.idx", sizeof(SCtimeIdxKey), 0, ctimeIdxCmpr, pMeta->pEnv, &pMeta->pCtimeIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta ctime index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// idx num of col, normal table only
|
||||
ret = tdbTbOpen("ncol.idx", sizeof(SNcolIdxKey), 0, ncolIdxCmpr, pMeta->pEnv, &pMeta->pNcolIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta ncol index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
|
@ -166,6 +183,8 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
|||
_err:
|
||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
||||
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
|
||||
if (pMeta->pCtimeIdx) tdbTbClose(pMeta->pCtimeIdx);
|
||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
|
||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||
|
@ -187,6 +206,8 @@ int metaClose(SMeta *pMeta) {
|
|||
if (pMeta->pCache) metaCacheClose(pMeta);
|
||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
||||
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
|
||||
if (pMeta->pCtimeIdx) tdbTbClose(pMeta->pCtimeIdx);
|
||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
|
||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||
|
@ -391,6 +412,43 @@ static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int ctimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||
SCtimeIdxKey *pCtimeIdxKey1 = (SCtimeIdxKey *)pKey1;
|
||||
SCtimeIdxKey *pCtimeIdxKey2 = (SCtimeIdxKey *)pKey2;
|
||||
if (pCtimeIdxKey1->ctime > pCtimeIdxKey2->ctime) {
|
||||
return 1;
|
||||
} else if (pCtimeIdxKey1->ctime < pCtimeIdxKey2->ctime) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pCtimeIdxKey1->uid > pCtimeIdxKey2->uid) {
|
||||
return 1;
|
||||
} else if (pCtimeIdxKey1->uid < pCtimeIdxKey2->uid) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||
SNcolIdxKey *pNcolIdxKey1 = (SNcolIdxKey *)pKey1;
|
||||
SNcolIdxKey *pNcolIdxKey2 = (SNcolIdxKey *)pKey2;
|
||||
|
||||
if (pNcolIdxKey1->ncol > pNcolIdxKey2->ncol) {
|
||||
return 1;
|
||||
} else if (pNcolIdxKey1->ncol < pNcolIdxKey2->ncol) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pNcolIdxKey1->uid > pNcolIdxKey2->uid) {
|
||||
return 1;
|
||||
} else if (pNcolIdxKey1->uid < pNcolIdxKey2->uid) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||
SSmaIdxKey *pSmaIdxKey1 = (SSmaIdxKey *)pKey1;
|
||||
SSmaIdxKey *pSmaIdxKey2 = (SSmaIdxKey *)pKey2;
|
||||
|
|
|
@ -1038,6 +1038,143 @@ typedef struct {
|
|||
int32_t vLen;
|
||||
} SIdxCursor;
|
||||
|
||||
int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||
int32_t ret = 0;
|
||||
|
||||
SIdxCursor *pCursor = NULL;
|
||||
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
||||
pCursor->pMeta = pMeta;
|
||||
pCursor->suid = param->suid;
|
||||
pCursor->cid = param->cid;
|
||||
pCursor->type = param->type;
|
||||
|
||||
metaRLock(pMeta);
|
||||
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL);
|
||||
if (ret != 0) {
|
||||
goto END;
|
||||
}
|
||||
int64_t uidLimit = param->reverse ? INT64_MAX : 0;
|
||||
|
||||
SCtimeIdxKey ctimeKey = {.ctime = *(int64_t *)(param->val), .uid = uidLimit};
|
||||
SCtimeIdxKey *pCtimeKey = &ctimeKey;
|
||||
|
||||
int cmp = 0;
|
||||
if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) {
|
||||
goto END;
|
||||
}
|
||||
bool first = true;
|
||||
int32_t valid = 0;
|
||||
while (1) {
|
||||
void *entryKey = NULL;
|
||||
int32_t nEntryKey = -1;
|
||||
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL);
|
||||
if (valid < 0) break;
|
||||
|
||||
SCtimeIdxKey *p = entryKey;
|
||||
|
||||
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type);
|
||||
if (cmp == 0) taosArrayPush(pUids, &p->uid);
|
||||
if (cmp == -1) break;
|
||||
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||
if (valid < 0) break;
|
||||
}
|
||||
|
||||
END:
|
||||
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
||||
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
||||
taosMemoryFree(pCursor);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||
int32_t ret = 0;
|
||||
char *buf = NULL;
|
||||
|
||||
STagIdxKey *pKey = NULL;
|
||||
int32_t nKey = 0;
|
||||
|
||||
SIdxCursor *pCursor = NULL;
|
||||
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
||||
pCursor->pMeta = pMeta;
|
||||
pCursor->suid = param->suid;
|
||||
pCursor->cid = param->cid;
|
||||
pCursor->type = param->type;
|
||||
|
||||
char *pName = param->val;
|
||||
|
||||
metaRLock(pMeta);
|
||||
ret = tdbTbcOpen(pMeta->pNameIdx, &pCursor->pCur, NULL);
|
||||
if (ret != 0) {
|
||||
goto END;
|
||||
}
|
||||
|
||||
int cmp = 0;
|
||||
if (tdbTbcMoveTo(pCursor->pCur, pName, strlen(pName) + 1, &cmp) < 0) {
|
||||
goto END;
|
||||
}
|
||||
bool first = true;
|
||||
int32_t valid = 0;
|
||||
while (1) {
|
||||
void *pEntryKey = NULL, *pEntryVal = NULL;
|
||||
int32_t nEntryKey = -1, nEntryVal = 0;
|
||||
valid = tdbTbcGet(pCursor->pCur, (const void **)pEntryKey, &nEntryKey, (const void **)&pEntryVal, &nEntryVal);
|
||||
if (valid < 0) break;
|
||||
|
||||
char *pTableKey = (char *)pEntryKey;
|
||||
int32_t cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
|
||||
if (cmp == 0) {
|
||||
tb_uid_t tuid = *(tb_uid_t *)pEntryVal;
|
||||
taosArrayPush(pUids, &tuid);
|
||||
} else if (cmp == 1) {
|
||||
// next
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||
if (valid < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
END:
|
||||
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
||||
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(pKey);
|
||||
|
||||
taosMemoryFree(pCursor);
|
||||
|
||||
return ret;
|
||||
}
|
||||
int32_t metaFilterTtl(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||
int32_t ret = 0;
|
||||
char *buf = NULL;
|
||||
|
||||
STtlIdxKey *pKey = NULL;
|
||||
int32_t nKey = 0;
|
||||
|
||||
SIdxCursor *pCursor = NULL;
|
||||
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
||||
pCursor->pMeta = pMeta;
|
||||
pCursor->suid = param->suid;
|
||||
pCursor->cid = param->cid;
|
||||
pCursor->type = param->type;
|
||||
|
||||
metaRLock(pMeta);
|
||||
ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL);
|
||||
|
||||
END:
|
||||
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
||||
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(pKey);
|
||||
|
||||
taosMemoryFree(pCursor);
|
||||
|
||||
return ret;
|
||||
// impl later
|
||||
return 0;
|
||||
}
|
||||
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||
int32_t ret = 0;
|
||||
char *buf = NULL;
|
||||
|
@ -1053,7 +1190,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
pCursor->type = param->type;
|
||||
|
||||
metaRLock(pMeta);
|
||||
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
|
||||
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL);
|
||||
if (ret < 0) {
|
||||
goto END;
|
||||
}
|
||||
|
@ -1064,6 +1201,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
|
||||
if (param->val == NULL) {
|
||||
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
|
||||
ret = -1;
|
||||
goto END;
|
||||
} else {
|
||||
if (IS_VAR_DATA_TYPE(param->type)) {
|
||||
|
@ -1104,9 +1242,11 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
|
||||
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
|
||||
if (valid < 0) {
|
||||
tdbFree(entryVal);
|
||||
break;
|
||||
}
|
||||
STagIdxKey *p = entryKey;
|
||||
if (p == NULL) break;
|
||||
if (p->type != pCursor->type) {
|
||||
if (first) {
|
||||
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||
|
@ -1116,7 +1256,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
if (p == NULL || p->suid != pKey->suid) {
|
||||
if (p->suid != pKey->suid) {
|
||||
break;
|
||||
}
|
||||
first = false;
|
||||
|
|
|
@ -27,6 +27,11 @@ static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
|
|||
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
||||
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
|
||||
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
|
||||
// opt ins_tables query
|
||||
static int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||
static int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||
|
||||
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
|
||||
pInfo->uid = pEntry->uid;
|
||||
|
@ -551,6 +556,26 @@ static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME) {
|
|||
ttlKey->dtime = ctime / 1000 + ttlDays * tsTtlUnit;
|
||||
ttlKey->uid = pME->uid;
|
||||
}
|
||||
static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) {
|
||||
int64_t ctime;
|
||||
if (pME->type == TSDB_CHILD_TABLE) {
|
||||
ctime = pME->ctbEntry.ctime;
|
||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||
ctime = pME->ntbEntry.ctime;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ctimeKey->ctime = ctime;
|
||||
ctimeKey->uid = pME->uid;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
|
||||
ncolKey->ncol = pME->ntbEntry.schemaRow.nCols;
|
||||
ncolKey->uid = pME->uid;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
STtlIdxKey ttlKey = {0};
|
||||
|
@ -632,6 +657,9 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
|||
tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, &pMeta->txn);
|
||||
tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), &pMeta->txn);
|
||||
|
||||
if (e.type == TSDB_CHILD_TABLE || e.type == TSDB_NORMAL_TABLE) metaDeleteCtimeIdx(pMeta, &e);
|
||||
if (e.type == TSDB_NORMAL_TABLE) metaDeleteNcolIdx(pMeta, &e);
|
||||
|
||||
if (e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e);
|
||||
|
||||
if (e.type == TSDB_CHILD_TABLE) {
|
||||
|
@ -658,6 +686,37 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
// opt ins_tables
|
||||
int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
SCtimeIdxKey ctimeKey = {0};
|
||||
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
|
||||
return 0;
|
||||
}
|
||||
return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, &pMeta->txn);
|
||||
}
|
||||
|
||||
int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
SCtimeIdxKey ctimeKey = {0};
|
||||
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
|
||||
return 0;
|
||||
}
|
||||
return tdbTbDelete(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), &pMeta->txn);
|
||||
}
|
||||
int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
SNcolIdxKey ncolKey = {0};
|
||||
if (metaBuildNColIdxKey(&ncolKey, pME) < 0) {
|
||||
return 0;
|
||||
}
|
||||
return tdbTbInsert(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), NULL, 0, &pMeta->txn);
|
||||
}
|
||||
|
||||
int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
SNcolIdxKey ncolKey = {0};
|
||||
if (metaBuildNColIdxKey(&ncolKey, pME) < 0) {
|
||||
return 0;
|
||||
}
|
||||
return tdbTbDelete(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), &pMeta->txn);
|
||||
}
|
||||
|
||||
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) {
|
||||
void *pVal = NULL;
|
||||
|
@ -1278,12 +1337,14 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
ret = metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
|
||||
goto end;
|
||||
}
|
||||
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
|
||||
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
||||
ret = -1;
|
||||
goto end;
|
||||
if (pTagData != NULL) {
|
||||
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
|
||||
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
|
||||
}
|
||||
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
|
||||
end:
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
tDecoderClear(&dc);
|
||||
|
@ -1370,6 +1431,12 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
|
|||
}
|
||||
}
|
||||
|
||||
if (metaUpdateCtimeIdx(pMeta, pME) < 0) goto _err;
|
||||
|
||||
if (pME->type == TSDB_NORMAL_TABLE) {
|
||||
if (metaUpdateNcolIdx(pMeta, pME) < 0) goto _err;
|
||||
}
|
||||
|
||||
if (pME->type != TSDB_SUPER_TABLE) {
|
||||
if (metaUpdateTtlIdx(pMeta, pME) < 0) goto _err;
|
||||
}
|
||||
|
|
|
@ -1492,4 +1492,11 @@ void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
|
|||
|
||||
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
|
||||
|
||||
size_t tsdbCacheGetUsage(SVnode *pVnode) { return taosLRUCacheGetUsage(pVnode->pTsdb->lruCache); }
|
||||
size_t tsdbCacheGetUsage(SVnode *pVnode) {
|
||||
size_t usage = 0;
|
||||
if (pVnode->pTsdb != NULL) {
|
||||
usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
|
||||
}
|
||||
|
||||
return usage;
|
||||
}
|
||||
|
|
|
@ -541,6 +541,12 @@ typedef struct {
|
|||
SSnapContext* sContext;
|
||||
} SStreamRawScanInfo;
|
||||
|
||||
typedef struct SSysTableIndex {
|
||||
int8_t init;
|
||||
SArray *uids;
|
||||
int32_t lastIdx;
|
||||
} SSysTableIndex;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
SRetrieveMetaTableRsp* pRsp;
|
||||
SRetrieveTableReq req;
|
||||
|
@ -553,6 +559,7 @@ typedef struct SSysTableScanInfo {
|
|||
bool showRewrite;
|
||||
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
||||
SMTbCursor* pCur; // cursor for iterate the local table meta store.
|
||||
SSysTableIndex* pIdx; // idx for local table meta
|
||||
SArray* scanCols; // SArray<int16_t> scan column id list
|
||||
SName name;
|
||||
SSDataBlock* pRes;
|
||||
|
|
|
@ -422,16 +422,6 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
goto end;
|
||||
}
|
||||
}
|
||||
/*else {
|
||||
code = metaGetTableTagsByUids(metaHandle, suid, uidList, tags);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
qError("failed to get table from meta idx, reason: %s, suid:%" PRId64, tstrerror(code), suid);
|
||||
goto end;
|
||||
} else {
|
||||
qInfo("succ to get table from meta idx, suid:%" PRId64, suid);
|
||||
}
|
||||
}*/
|
||||
|
||||
int32_t rows = taosArrayGetSize(uidList);
|
||||
if (rows == 0) {
|
||||
|
@ -1212,11 +1202,10 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
#if 1
|
||||
// todo refactor: add the parameter for tbname function
|
||||
const char* name = "tbname";
|
||||
int32_t len = strlen(name);
|
||||
int32_t len = strlen(name);
|
||||
|
||||
if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
|
||||
pExprNode->_function.functionName[len] == 0) {
|
||||
|
||||
pFuncNode->pParameterList = nodesMakeList();
|
||||
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
|
@ -1261,13 +1250,13 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
} else if (type == QUERY_NODE_CASE_WHEN) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
||||
SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
|
||||
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
|
||||
|
||||
SDataType* pType = &pCaseNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale,
|
||||
pType->precision, pCaseNode->node.aliasName);
|
||||
pExp->base.resSchema =
|
||||
createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
|
||||
pExp->pExpr->_optrRoot.pRootNode = pNode;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -290,21 +290,21 @@ bool fstVerify(Fst* fst);
|
|||
// refactor this function
|
||||
bool fstBuilderNodeCompileTo(FstBuilderNode* b, IdxFstFile* wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
|
||||
|
||||
typedef struct StreamState {
|
||||
typedef struct FstStreamState {
|
||||
FstNode* node;
|
||||
uint64_t trans;
|
||||
FstOutput out;
|
||||
void* autState;
|
||||
} StreamState;
|
||||
} FstStreamState;
|
||||
|
||||
void streamStateDestroy(void* s);
|
||||
void fstStreamStateDestroy(void* s);
|
||||
|
||||
typedef struct FStmSt {
|
||||
Fst* fst;
|
||||
FAutoCtx* aut;
|
||||
SArray* inp;
|
||||
FstOutput emptyOutput;
|
||||
SArray* stack; // <StreamState>
|
||||
SArray* stack; // <FstStreamState>
|
||||
FstBoundWithData* endAt;
|
||||
} FStmSt;
|
||||
|
||||
|
@ -317,14 +317,14 @@ typedef struct FStmStRslt {
|
|||
FStmStRslt* swsResultCreate(FstSlice* data, FstOutput fOut, void* state);
|
||||
void swsResultDestroy(FStmStRslt* result);
|
||||
|
||||
typedef void* (*StreamCallback)(void*);
|
||||
typedef void* (*streamCallback__fn)(void*);
|
||||
FStmSt* stmStCreate(Fst* fst, FAutoCtx* automation, FstBoundWithData* min, FstBoundWithData* max);
|
||||
|
||||
void stmStDestroy(FStmSt* sws);
|
||||
|
||||
bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min);
|
||||
|
||||
FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback);
|
||||
FStmStRslt* stmStNextWith(FStmSt* sws, streamCallback__fn callback);
|
||||
|
||||
FStmBuilder* stmBuilderCreate(Fst* fst, FAutoCtx* aut);
|
||||
|
||||
|
|
|
@ -1165,7 +1165,7 @@ FStmSt* stmStCreate(Fst* fst, FAutoCtx* automation, FstBoundWithData* min, FstBo
|
|||
sws->emptyOutput.null = true;
|
||||
sws->emptyOutput.out = 0;
|
||||
|
||||
sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState));
|
||||
sws->stack = (SArray*)taosArrayInit(256, sizeof(FstStreamState));
|
||||
sws->endAt = max;
|
||||
stmStSeekMin(sws, min);
|
||||
|
||||
|
@ -1177,7 +1177,7 @@ void stmStDestroy(FStmSt* sws) {
|
|||
}
|
||||
|
||||
taosArrayDestroy(sws->inp);
|
||||
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
||||
taosArrayDestroyEx(sws->stack, fstStreamStateDestroy);
|
||||
|
||||
taosMemoryFree(sws);
|
||||
}
|
||||
|
@ -1188,10 +1188,10 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
|||
if (fstBoundWithDataIsIncluded(min)) {
|
||||
sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null));
|
||||
}
|
||||
StreamState s = {.node = fstGetRoot(sws->fst),
|
||||
.trans = 0,
|
||||
.out = {.null = false, .out = 0},
|
||||
.autState = automFuncs[aut->type].start(aut)}; // auto.start callback
|
||||
FstStreamState s = {.node = fstGetRoot(sws->fst),
|
||||
.trans = 0,
|
||||
.out = {.null = false, .out = 0},
|
||||
.autState = automFuncs[aut->type].start(aut)}; // auto.start callback
|
||||
taosArrayPush(sws->stack, &s);
|
||||
return true;
|
||||
}
|
||||
|
@ -1223,7 +1223,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
|||
autState = automFuncs[aut->type].accept(aut, preState, b);
|
||||
taosArrayPush(sws->inp, &b);
|
||||
|
||||
StreamState s = {.node = node, .trans = res + 1, .out = {.null = false, .out = out}, .autState = preState};
|
||||
FstStreamState s = {.node = node, .trans = res + 1, .out = {.null = false, .out = out}, .autState = preState};
|
||||
node = NULL;
|
||||
|
||||
taosArrayPush(sws->stack, &s);
|
||||
|
@ -1244,7 +1244,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
|||
}
|
||||
}
|
||||
|
||||
StreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState};
|
||||
FstStreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState};
|
||||
taosArrayPush(sws->stack, &s);
|
||||
taosMemoryFree(trans);
|
||||
return true;
|
||||
|
@ -1255,7 +1255,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
|||
|
||||
uint32_t sz = taosArrayGetSize(sws->stack);
|
||||
if (sz != 0) {
|
||||
StreamState* s = taosArrayGet(sws->stack, sz - 1);
|
||||
FstStreamState* s = taosArrayGet(sws->stack, sz - 1);
|
||||
if (inclusize) {
|
||||
s->trans -= 1;
|
||||
taosArrayPop(sws->inp);
|
||||
|
@ -1264,7 +1264,7 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
|||
uint64_t trans = s->trans;
|
||||
FstTransition trn;
|
||||
fstNodeGetTransitionAt(n, trans - 1, &trn);
|
||||
StreamState s = {
|
||||
FstStreamState s = {
|
||||
.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
|
||||
taosArrayPush(sws->stack, &s);
|
||||
return true;
|
||||
|
@ -1274,14 +1274,14 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
|||
|
||||
return false;
|
||||
}
|
||||
FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) {
|
||||
FStmStRslt* stmStNextWith(FStmSt* sws, streamCallback__fn callback) {
|
||||
FAutoCtx* aut = sws->aut;
|
||||
FstOutput output = sws->emptyOutput;
|
||||
if (output.null == false) {
|
||||
FstSlice emptySlice = fstSliceCreate(NULL, 0);
|
||||
if (fstBoundWithDataExceededBy(sws->endAt, &emptySlice)) {
|
||||
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
||||
sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState));
|
||||
taosArrayDestroyEx(sws->stack, fstStreamStateDestroy);
|
||||
sws->stack = (SArray*)taosArrayInit(256, sizeof(FstStreamState));
|
||||
return NULL;
|
||||
}
|
||||
void* start = automFuncs[aut->type].start(aut);
|
||||
|
@ -1292,12 +1292,12 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) {
|
|||
}
|
||||
SArray* nodes = taosArrayInit(8, sizeof(FstNode*));
|
||||
while (taosArrayGetSize(sws->stack) > 0) {
|
||||
StreamState* p = (StreamState*)taosArrayPop(sws->stack);
|
||||
FstStreamState* p = (FstStreamState*)taosArrayPop(sws->stack);
|
||||
if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) {
|
||||
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
|
||||
taosArrayPop(sws->inp);
|
||||
}
|
||||
streamStateDestroy(p);
|
||||
fstStreamStateDestroy(p);
|
||||
continue;
|
||||
}
|
||||
FstTransition trn;
|
||||
|
@ -1318,10 +1318,10 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) {
|
|||
isMatch = automFuncs[aut->type].isMatch(aut, eofState);
|
||||
}
|
||||
}
|
||||
StreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState};
|
||||
FstStreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState};
|
||||
taosArrayPush(sws->stack, &s1);
|
||||
|
||||
StreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState};
|
||||
FstStreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState};
|
||||
taosArrayPush(sws->stack, &s2);
|
||||
|
||||
int32_t isz = taosArrayGetSize(sws->inp);
|
||||
|
@ -1331,8 +1331,8 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) {
|
|||
}
|
||||
FstSlice slice = fstSliceCreate(buf, isz);
|
||||
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
|
||||
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
||||
sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState));
|
||||
taosArrayDestroyEx(sws->stack, fstStreamStateDestroy);
|
||||
sws->stack = (SArray*)taosArrayInit(256, sizeof(FstStreamState));
|
||||
taosMemoryFreeClear(buf);
|
||||
fstSliceDestroy(&slice);
|
||||
taosArrayDestroy(nodes);
|
||||
|
@ -1375,11 +1375,11 @@ void swsResultDestroy(FStmStRslt* result) {
|
|||
taosMemoryFree(result);
|
||||
}
|
||||
|
||||
void streamStateDestroy(void* s) {
|
||||
void fstStreamStateDestroy(void* s) {
|
||||
if (NULL == s) {
|
||||
return;
|
||||
}
|
||||
StreamState* ss = (StreamState*)s;
|
||||
FstStreamState* ss = (FstStreamState*)s;
|
||||
fstNodeDestroy(ss->node);
|
||||
}
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
|
|||
} else {
|
||||
p->colType = TSDB_DATA_TYPE_DOUBLE;
|
||||
}
|
||||
IDX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
||||
IDX_TYPE_ADD_EXTERN_TYPE((p->colType), TSDB_DATA_TYPE_JSON);
|
||||
}
|
||||
// handle put
|
||||
return indexPut(index, terms, uid);
|
||||
|
|
|
@ -69,6 +69,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->idleTime = pInit->idleTime;
|
||||
pRpc->tcphandle =
|
||||
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
|
||||
if (pRpc->tcphandle == NULL) {
|
||||
taosMemoryFree(pRpc);
|
||||
return NULL;
|
||||
|
|
|
@ -439,12 +439,14 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
|
|||
tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
|
||||
TMSG_INFO(transMsg.msgType));
|
||||
if (transMsg.info.ahandle == NULL) {
|
||||
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
||||
int32_t msgType = 0;
|
||||
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
|
||||
transMsg.msgType = msgType;
|
||||
tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
transMsg.info.ahandle);
|
||||
}
|
||||
} else {
|
||||
transMsg.info.ahandle = (pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
|
||||
transMsg.info.ahandle = (pMsg != NULL && pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
|
||||
}
|
||||
|
||||
if (pCtx == NULL || pCtx->pSem == NULL) {
|
||||
|
@ -1078,9 +1080,6 @@ static void cliPrepareCb(uv_prepare_t* handle) {
|
|||
QUEUE_REMOVE(h);
|
||||
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
if (pMsg == NULL) {
|
||||
continue;
|
||||
}
|
||||
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
|
||||
count++;
|
||||
}
|
||||
|
|
|
@ -73,6 +73,7 @@ typedef struct TdEpoll {
|
|||
EpollFd fd;
|
||||
} * TdEpollPtr, TdEpoll;
|
||||
|
||||
#if 0
|
||||
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
|
||||
int addrlen) {
|
||||
if (pSocket == NULL || pSocket->fd < 0) {
|
||||
|
@ -84,6 +85,7 @@ int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags,
|
|||
return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) {
|
||||
if (pSocket == NULL || pSocket->fd < 0) {
|
||||
return -1;
|
||||
|
@ -114,6 +116,8 @@ int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t
|
|||
}
|
||||
return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen);
|
||||
}
|
||||
#endif // endif 0
|
||||
|
||||
int32_t taosCloseSocketNoCheck1(SocketFd fd) {
|
||||
#ifdef WINDOWS
|
||||
return closesocket(fd);
|
||||
|
@ -121,6 +125,7 @@ int32_t taosCloseSocketNoCheck1(SocketFd fd) {
|
|||
return close(fd);
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t taosCloseSocket(TdSocketPtr *ppSocket) {
|
||||
int32_t code;
|
||||
if (ppSocket == NULL || *ppSocket == NULL || (*ppSocket)->fd < 0) {
|
||||
|
@ -131,6 +136,8 @@ int32_t taosCloseSocket(TdSocketPtr *ppSocket) {
|
|||
taosMemoryFree(*ppSocket);
|
||||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) {
|
||||
int32_t code;
|
||||
if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) {
|
||||
|
@ -216,20 +223,6 @@ int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) {
|
|||
#endif
|
||||
}
|
||||
|
||||
void taosWinSocketInit() {
|
||||
#ifdef WINDOWS
|
||||
static char flag = 0;
|
||||
if (flag == 0) {
|
||||
WORD wVersionRequested;
|
||||
WSADATA wsaData;
|
||||
wVersionRequested = MAKEWORD(1, 1);
|
||||
if (WSAStartup(wVersionRequested, &wsaData) == 0) {
|
||||
flag = 1;
|
||||
}
|
||||
}
|
||||
#else
|
||||
#endif
|
||||
}
|
||||
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) {
|
||||
if (pSocket == NULL || pSocket->fd < 0) {
|
||||
return -1;
|
||||
|
@ -262,6 +255,8 @@ int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) {
|
|||
#endif
|
||||
return 0;
|
||||
}
|
||||
#endif // endif 0
|
||||
|
||||
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) {
|
||||
if (pSocket == NULL || pSocket->fd < 0) {
|
||||
return -1;
|
||||
|
@ -296,6 +291,8 @@ int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void
|
|||
return setsockopt(pSocket->fd, level, optname, optval, (int)optlen);
|
||||
#endif
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
|
||||
if (pSocket == NULL || pSocket->fd < 0) {
|
||||
return -1;
|
||||
|
@ -307,6 +304,9 @@ int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void
|
|||
return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen);
|
||||
#endif
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
uint32_t taosInetAddr(const char *ipAddr) {
|
||||
#ifdef WINDOWS
|
||||
uint32_t value;
|
||||
|
@ -330,6 +330,7 @@ const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len) {
|
|||
|
||||
#define TCP_CONN_TIMEOUT 3000 // conn timeout
|
||||
|
||||
#if 0
|
||||
int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
|
||||
if (pSocket == NULL || pSocket->fd < 0) {
|
||||
return -1;
|
||||
|
@ -726,6 +727,7 @@ int taosValidIp(uint32_t ip) {
|
|||
#endif
|
||||
return 0;
|
||||
}
|
||||
#endif // endif 0
|
||||
|
||||
bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
|
||||
struct sockaddr_in serverAdd;
|
||||
|
@ -774,6 +776,8 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
|
|||
return true;
|
||||
// return 0 == taosValidIp(ip) ? true : false;
|
||||
}
|
||||
|
||||
#if 0
|
||||
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
|
||||
struct sockaddr_in serverAdd;
|
||||
SocketFd fd;
|
||||
|
@ -888,6 +892,36 @@ int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len
|
|||
return len;
|
||||
}
|
||||
|
||||
// Function converting an IP address string to an uint32_t.
|
||||
uint32_t ip2uint(const char *const ip_addr) {
|
||||
char ip_addr_cpy[20];
|
||||
char ip[5];
|
||||
|
||||
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
|
||||
|
||||
char *s_start, *s_end;
|
||||
s_start = ip_addr_cpy;
|
||||
s_end = ip_addr_cpy;
|
||||
|
||||
int32_t k;
|
||||
|
||||
for (k = 0; *s_start != '\0'; s_start = s_end) {
|
||||
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
|
||||
}
|
||||
if (*s_end == '.') {
|
||||
*s_end = '\0';
|
||||
s_end++;
|
||||
}
|
||||
ip[k++] = (char)atoi(s_start);
|
||||
}
|
||||
|
||||
ip[k] = '\0';
|
||||
|
||||
return *((uint32_t *)ip);
|
||||
}
|
||||
|
||||
#endif // endif 0
|
||||
|
||||
void taosBlockSIGPIPE() {
|
||||
#ifdef WINDOWS
|
||||
// assert(0);
|
||||
|
@ -991,34 +1025,6 @@ int32_t taosGetFqdn(char *fqdn) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// Function converting an IP address string to an uint32_t.
|
||||
uint32_t ip2uint(const char *const ip_addr) {
|
||||
char ip_addr_cpy[20];
|
||||
char ip[5];
|
||||
|
||||
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
|
||||
|
||||
char *s_start, *s_end;
|
||||
s_start = ip_addr_cpy;
|
||||
s_end = ip_addr_cpy;
|
||||
|
||||
int32_t k;
|
||||
|
||||
for (k = 0; *s_start != '\0'; s_start = s_end) {
|
||||
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
|
||||
}
|
||||
if (*s_end == '.') {
|
||||
*s_end = '\0';
|
||||
s_end++;
|
||||
}
|
||||
ip[k++] = (char)atoi(s_start);
|
||||
}
|
||||
|
||||
ip[k] = '\0';
|
||||
|
||||
return *((uint32_t *)ip);
|
||||
}
|
||||
|
||||
void tinet_ntoa(char *ipstr, uint32_t ip) {
|
||||
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
|
||||
}
|
||||
|
@ -1039,12 +1045,14 @@ void taosSetMaskSIGPIPE() {
|
|||
#endif
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *addrLen) {
|
||||
if (pSocket == NULL || pSocket->fd < 0) {
|
||||
return -1;
|
||||
}
|
||||
return getsockname(pSocket->fd, destAddr, addrLen);
|
||||
}
|
||||
#endif // endif 0
|
||||
|
||||
/*
|
||||
* Set TCP connection timeout per-socket level.
|
||||
|
@ -1080,3 +1088,18 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
|
|||
|
||||
return (int)fd;
|
||||
}
|
||||
|
||||
void taosWinSocketInit() {
|
||||
#ifdef WINDOWS
|
||||
static char flag = 0;
|
||||
if (flag == 0) {
|
||||
WORD wVersionRequested;
|
||||
WSADATA wsaData;
|
||||
wVersionRequested = MAKEWORD(1, 1);
|
||||
if (WSAStartup(wVersionRequested, &wsaData) == 0) {
|
||||
flag = 1;
|
||||
}
|
||||
}
|
||||
#else
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -87,17 +87,17 @@ if $rows != 2 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print $tbPrefix
|
||||
$tb = $tbPrefix . 0
|
||||
if $data00 != wh_tb1 then
|
||||
print expect wh_tb1, actual:$data00
|
||||
return -1
|
||||
endi
|
||||
$tb = $tbPrefix . 1
|
||||
if $data10 != wh_tb0 then
|
||||
print expect wh_tb0, actual:$data00
|
||||
return -1
|
||||
endi
|
||||
#print $tbPrefix
|
||||
#$tb = $tbPrefix . 0
|
||||
#if $data00 != wh_tb1 then
|
||||
# print expect wh_tb1, actual:$data00
|
||||
# return -1
|
||||
#endi
|
||||
#$tb = $tbPrefix . 1
|
||||
#if $data10 != wh_tb0 then
|
||||
# print expect wh_tb0, actual:$data00
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
## select specified columns
|
||||
|
||||
|
|
Loading…
Reference in New Issue