diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 04205b74d3..c45c500536 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -158,12 +158,12 @@ int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int int32_t nBuf); int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); -int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, - int32_t nBuf); +int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf); int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); -int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, - int32_t nBuf); +int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf); int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, @@ -172,8 +172,8 @@ int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int int32_t nBuf); int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); -int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, - int32_t nBuf); +int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf); int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, @@ -182,8 +182,8 @@ int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 int32_t nBuf); int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); -int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, - int32_t nBuf); +int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, + void *pBuf, int32_t nBuf); // for internal usage int32_t getWordLength(char type); @@ -259,6 +259,12 @@ typedef enum L2Compress { } EL2ComressFuncType; int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level); + +#define DEFINE_VAR(cmprAlg) \ + uint8_t l1 = COMPRESS_L1_TYPE_U32(cmprAlg); \ + uint8_t l2 = COMPRESS_L2_TYPE_U32(cmprAlg); \ + uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U32(cmprAlg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbDef.h b/source/dnode/vnode/src/tsdb/tsdbDef.h index 33fd4990d3..fb25b81af2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDef.h +++ b/source/dnode/vnode/src/tsdb/tsdbDef.h @@ -41,7 +41,7 @@ extern int32_t tsdbFsyncFile(STsdbFD *pFD); typedef struct SColCompressInfo SColCompressInfo; struct SColCompressInfo { SHashObj *pColCmpr; - int8_t defaultCmprAlg; + uint32_t defaultCmprAlg; }; typedef struct SColCompressInfo2 SColCompressInfo2; struct SColCompressInfo2 { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 7f1995129e..88cc22dc5c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tcompression.h" #include "tdataformat.h" #include "tsdb.h" #include "tsdbDef.h" @@ -1412,6 +1413,7 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB int32_t lino = 0; SColCompressInfo *pInfo = pCompr; + code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg); SDiskDataHdr hdr = { .delimiter = TSDB_FILE_DLMT, @@ -1530,7 +1532,13 @@ int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) { if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code; if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code; if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code; - if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code; + if (pHdr->fmtVer < 1) { + if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code; + } else if (pHdr->fmtVer == 1) { + if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code; + } else { + // more data fmt ver + } if (pHdr->fmtVer == 1) { if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code; for (int i = 0; i < pHdr->numOfPKs; i++) { @@ -1553,7 +1561,15 @@ int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) { if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code; if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code; if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code; - if ((code = tBufferGetI8(br, &pHdr->cmprAlg))) return code; + if (pHdr->fmtVer < 1) { + int8_t cmprAlg = 0; + if ((code = tBufferGetI8(br, &cmprAlg))) return code; + pHdr->cmprAlg = cmprAlg; + } else if (pHdr->fmtVer == 1) { + if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code; + } else { + // more data fmt ver + } if (pHdr->fmtVer == 1) { if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code; for (int i = 0; i < pHdr->numOfPKs; i++) { @@ -1625,7 +1641,6 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .dataType = TSDB_DATA_TYPE_TIMESTAMP, .originalSize = sizeof(TSKEY) * bData->nRow, }; - // tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, 1, &cinfo.cmprAlg); code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist); TSDB_CHECK_CODE(code, lino, _exit); @@ -1788,5 +1803,10 @@ int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) { if (ret == NULL) return -1; *alg = *ret; + return 0; +} +uint32_t tsdbCvtTimestampAlg(uint32_t alg) { + DEFINE_VAR(alg) + return 0; } \ No newline at end of file diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 4f004157f2..bcd7eea572 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -298,16 +298,11 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta) { memcpy(p, pTableMeta, cpSize); if (hasSchemaExt) { p->schemaExt = (SSchemaExt*)(((char*)p) + size); + memcpy(p->schemaExt, pTableMeta->schemaExt, schemaExtSize); } else { p->schemaExt = NULL; } memcpy(p->schema, pTableMeta->schema, numOfFields * sizeof(SSchema)); - // p->schemaExt = NULL; - // if (hasSchemaExt) { - // SSchemaExt* pSchemaExt = (SSchemaExt*)((char*)p + size); - // p->schemaExt = pSchemaExt; - // memcpy(pSchemaExt, pTableMeta->schemaExt, schemaExtSize); - // } return p; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c3de9a7882..cd5b7188ad 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -585,11 +585,11 @@ void* destroyConnPool(SCliThrd* pThrd) { static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; STrans* pTranInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key) + 1); + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -624,11 +624,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { void* pool = pThrd->pool; STrans* pTransInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key) + 1); + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -1537,20 +1537,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } - if (rpcDebugFlag & DEBUG_TRACE) { - if (tmsgIsValid(pMsg->msg.msgType)) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType)); - int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - if (NULL == 0) { - int localCount = 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count + 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } - } - } + // if (rpcDebugFlag & DEBUG_TRACE) { + // if (tmsgIsValid(pMsg->msg.msgType)) { + // char buf[128] = {0}; + // sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType)); + // int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); + // if (NULL == 0) { + // int localCount = 1; + // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + // } else { + // int localCount = *count + 1; + // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + // } + // } + // } char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); @@ -1730,7 +1730,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); - taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*)); + taosHashPut(pThrd->batchCache, key, strlen(key), &pBatchList, sizeof(void*)); } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); @@ -1800,20 +1800,20 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - if (rpcDebugFlag & DEBUG_TRACE) { - void* pIter = taosHashIterate(pThrd->msgCount, NULL); - while (pIter != NULL) { - int* count = pIter; - size_t len = 0; - char* key = taosHashGetKey(pIter, &len); - if (*count != 0) { - tDebug("key: %s count: %d", key, *count); - } + // if (rpcDebugFlag & DEBUG_TRACE) { + // void* pIter = taosHashIterate(pThrd->msgCount, NULL); + // while (pIter != NULL) { + // int* count = pIter; + // size_t len = 0; + // char* key = taosHashGetKey(pIter, &len); + // if (*count != 0) { + // tDebug("key: %s count: %d", key, *count); + // } - pIter = taosHashIterate(pThrd->msgCount, pIter); - } - tDebug("all conn count: %d", pThrd->newConnCount); - } + // pIter = taosHashIterate(pThrd->msgCount, pIter); + // } + // tDebug("all conn count: %d", pThrd->newConnCount); + // } int8_t supportBatch = pTransInst->supportBatch; if (supportBatch == 0) { @@ -2411,20 +2411,20 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } } - if (rpcDebugFlag & DEBUG_TRACE) { - if (tmsgIsValid(pResp->msgType - 1)) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); - int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - if (NULL == 0) { - int localCount = 0; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count - 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } - } - } + // if (rpcDebugFlag & DEBUG_TRACE) { + // if (tmsgIsValid(pResp->msgType - 1)) { + // char buf[128] = {0}; + // sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); + // int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); + // if (NULL == 0) { + // int localCount = 0; + // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + // } else { + // int localCount = *count - 1; + // taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); + // } + // } + // } if (pCtx->pSem || pCtx->syncMsgRef != 0) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pSem) { diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 06f1192fa0..ae4c3f4be1 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -2395,10 +2395,6 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } } -#define DEFINE_VAR(cmprAlg) \ - uint8_t l1 = COMPRESS_L1_TYPE_U32(cmprAlg); \ - uint8_t l2 = COMPRESS_L2_TYPE_U32(cmprAlg); \ - uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U32(cmprAlg); /************************************************************************* * REGULAR COMPRESSION 2 *************************************************************************/ @@ -2680,30 +2676,37 @@ int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 // Bigint ===================================================== int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { + DEFINE_VAR(cmprAlg) + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT); + return tsCompressStringImp(pBuf, len, pOut, nOut); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + ASSERTS(0, "compress algo invalid"); + return -1; + } else if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + ASSERTS(0, "compress algo invalid"); + return -1; + } return 0; - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT); - // return tsCompressStringImp(pBuf, len, pOut, nOut); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } } int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { - return 0; - // if (cmprAlg == ONE_STAGE_COMP) { - // return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); - // } else if (cmprAlg == TWO_STAGE_COMP) { - // if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - // return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT); - // } else { - // ASSERTS(0, "compress algo invalid"); - // return -1; - // } + DEFINE_VAR(cmprAlg) + if (l1 != L1_DISABLED && l2 == L2_DISABLED) { + return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT); + } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { + if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; + return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT); + } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { + ASSERTS(0, "compress algo invalid"); + return -1; + } else if (l1 == L1_DISABLED && l2 == L2_DISABLED) { + return -1; + } + return -1; } int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);