tsdb support compress

This commit is contained in:
yihaoDeng 2024-03-18 12:12:55 +00:00
parent 8e58f2387b
commit d31c7f40c7
6 changed files with 113 additions and 89 deletions

View File

@ -158,12 +158,12 @@ int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
int32_t nBuf);
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf);
int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf);
int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
@ -172,8 +172,8 @@ int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
int32_t nBuf);
int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf);
int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf);
int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf);
int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
@ -182,8 +182,8 @@ int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32
int32_t nBuf);
int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf);
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf);
// for internal usage
int32_t getWordLength(char type);
@ -259,6 +259,12 @@ typedef enum L2Compress {
} EL2ComressFuncType;
int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level);
#define DEFINE_VAR(cmprAlg) \
uint8_t l1 = COMPRESS_L1_TYPE_U32(cmprAlg); \
uint8_t l2 = COMPRESS_L2_TYPE_U32(cmprAlg); \
uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U32(cmprAlg);
#ifdef __cplusplus
}
#endif

View File

@ -41,7 +41,7 @@ extern int32_t tsdbFsyncFile(STsdbFD *pFD);
typedef struct SColCompressInfo SColCompressInfo;
struct SColCompressInfo {
SHashObj *pColCmpr;
int8_t defaultCmprAlg;
uint32_t defaultCmprAlg;
};
typedef struct SColCompressInfo2 SColCompressInfo2;
struct SColCompressInfo2 {

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tcompression.h"
#include "tdataformat.h"
#include "tsdb.h"
#include "tsdbDef.h"
@ -1412,6 +1413,7 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB
int32_t lino = 0;
SColCompressInfo *pInfo = pCompr;
code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg);
SDiskDataHdr hdr = {
.delimiter = TSDB_FILE_DLMT,
@ -1530,7 +1532,13 @@ int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) {
if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code;
if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code;
if (pHdr->fmtVer < 1) {
if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code;
} else if (pHdr->fmtVer == 1) {
if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code;
} else {
// more data fmt ver
}
if (pHdr->fmtVer == 1) {
if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code;
for (int i = 0; i < pHdr->numOfPKs; i++) {
@ -1553,7 +1561,15 @@ int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) {
if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code;
if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code;
if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code;
if ((code = tBufferGetI8(br, &pHdr->cmprAlg))) return code;
if (pHdr->fmtVer < 1) {
int8_t cmprAlg = 0;
if ((code = tBufferGetI8(br, &cmprAlg))) return code;
pHdr->cmprAlg = cmprAlg;
} else if (pHdr->fmtVer == 1) {
if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code;
} else {
// more data fmt ver
}
if (pHdr->fmtVer == 1) {
if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code;
for (int i = 0; i < pHdr->numOfPKs; i++) {
@ -1625,7 +1641,6 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S
.dataType = TSDB_DATA_TYPE_TIMESTAMP,
.originalSize = sizeof(TSKEY) * bData->nRow,
};
// tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, 1, &cinfo.cmprAlg);
code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist);
TSDB_CHECK_CODE(code, lino, _exit);
@ -1788,5 +1803,10 @@ int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) {
if (ret == NULL) return -1;
*alg = *ret;
return 0;
}
uint32_t tsdbCvtTimestampAlg(uint32_t alg) {
DEFINE_VAR(alg)
return 0;
}

View File

@ -298,16 +298,11 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
memcpy(p, pTableMeta, cpSize);
if (hasSchemaExt) {
p->schemaExt = (SSchemaExt*)(((char*)p) + size);
memcpy(p->schemaExt, pTableMeta->schemaExt, schemaExtSize);
} else {
p->schemaExt = NULL;
}
memcpy(p->schema, pTableMeta->schema, numOfFields * sizeof(SSchema));
// p->schemaExt = NULL;
// if (hasSchemaExt) {
// SSchemaExt* pSchemaExt = (SSchemaExt*)((char*)p + size);
// p->schemaExt = pSchemaExt;
// memcpy(pSchemaExt, pTableMeta->schemaExt, schemaExtSize);
// }
return p;
}

View File

@ -585,11 +585,11 @@ void* destroyConnPool(SCliThrd* pThrd) {
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
void* pool = pThrd->pool;
STrans* pTranInst = pThrd->pTransInst;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key) + 1);
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
@ -624,11 +624,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
void* pool = pThrd->pool;
STrans* pTransInst = pThrd->pTransInst;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key) + 1);
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
@ -1537,20 +1537,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return;
}
if (rpcDebugFlag & DEBUG_TRACE) {
if (tmsgIsValid(pMsg->msg.msgType)) {
char buf[128] = {0};
sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
if (NULL == 0) {
int localCount = 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
} else {
int localCount = *count + 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
}
}
}
// if (rpcDebugFlag & DEBUG_TRACE) {
// if (tmsgIsValid(pMsg->msg.msgType)) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
// int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
// if (NULL == 0) {
// int localCount = 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// } else {
// int localCount = *count + 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// }
// }
// }
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
@ -1730,7 +1730,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
taosHashPut(pThrd->batchCache, key, strlen(key), &pBatchList, sizeof(void*));
} else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
@ -1800,20 +1800,20 @@ static void cliAsyncCb(uv_async_t* handle) {
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
if (rpcDebugFlag & DEBUG_TRACE) {
void* pIter = taosHashIterate(pThrd->msgCount, NULL);
while (pIter != NULL) {
int* count = pIter;
size_t len = 0;
char* key = taosHashGetKey(pIter, &len);
if (*count != 0) {
tDebug("key: %s count: %d", key, *count);
}
// if (rpcDebugFlag & DEBUG_TRACE) {
// void* pIter = taosHashIterate(pThrd->msgCount, NULL);
// while (pIter != NULL) {
// int* count = pIter;
// size_t len = 0;
// char* key = taosHashGetKey(pIter, &len);
// if (*count != 0) {
// tDebug("key: %s count: %d", key, *count);
// }
pIter = taosHashIterate(pThrd->msgCount, pIter);
}
tDebug("all conn count: %d", pThrd->newConnCount);
}
// pIter = taosHashIterate(pThrd->msgCount, pIter);
// }
// tDebug("all conn count: %d", pThrd->newConnCount);
// }
int8_t supportBatch = pTransInst->supportBatch;
if (supportBatch == 0) {
@ -2411,20 +2411,20 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
}
if (rpcDebugFlag & DEBUG_TRACE) {
if (tmsgIsValid(pResp->msgType - 1)) {
char buf[128] = {0};
sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
if (NULL == 0) {
int localCount = 0;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
} else {
int localCount = *count - 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
}
}
}
// if (rpcDebugFlag & DEBUG_TRACE) {
// if (tmsgIsValid(pResp->msgType - 1)) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
// int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
// if (NULL == 0) {
// int localCount = 0;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// } else {
// int localCount = *count - 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// }
// }
// }
if (pCtx->pSem || pCtx->syncMsgRef != 0) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pSem) {

View File

@ -2395,10 +2395,6 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
}
}
#define DEFINE_VAR(cmprAlg) \
uint8_t l1 = COMPRESS_L1_TYPE_U32(cmprAlg); \
uint8_t l2 = COMPRESS_L2_TYPE_U32(cmprAlg); \
uint8_t lvl = COMPRESS_L2_TYPE_LEVEL_U32(cmprAlg);
/*************************************************************************
* REGULAR COMPRESSION 2
*************************************************************************/
@ -2680,30 +2676,37 @@ int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32
// Bigint =====================================================
int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT);
return tsCompressStringImp(pBuf, len, pOut, nOut);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
} else if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
}
return 0;
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT);
// return tsCompressStringImp(pBuf, len, pOut, nOut);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
}
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf) {
return 0;
// if (cmprAlg == ONE_STAGE_COMP) {
// return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
// } else if (cmprAlg == TWO_STAGE_COMP) {
// if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
// return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
// } else {
// ASSERTS(0, "compress algo invalid");
// return -1;
// }
DEFINE_VAR(cmprAlg)
if (l1 != L1_DISABLED && l2 == L2_DISABLED) {
return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {
if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {
ASSERTS(0, "compress algo invalid");
return -1;
} else if (l1 == L1_DISABLED && l2 == L2_DISABLED) {
return -1;
}
return -1;
}
int32_t tsFindCompressAlg(int8_t dataType, uint8_t compress, TCompressL1FnSet *l1Fn, TCompressL2FnSet *l2Fn);