From de77ce6480ec6cbd4ffea080a92dcee1639e2e5f Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 27 Jun 2024 09:51:49 +0000 Subject: [PATCH] add self check --- out | Bin 0 -> 21360 bytes source/libs/stream/src/streamBackendRocksdb.c | 199 ++++++++++++------ source/libs/stream/src/streamCheckpoint.c | 56 +++-- t.c | 12 ++ 4 files changed, 182 insertions(+), 85 deletions(-) create mode 100755 out create mode 100644 t.c diff --git a/out b/out new file mode 100755 index 0000000000000000000000000000000000000000..21f5cbee379517922a226c62b551376a5b0ad2f4 GIT binary patch literal 21360 zcmeHPe{@vUoxg7;Ap;3XApFM4L;*pXA%Q>vK@vh>QWKH|Qh(ORWacG#W-^n$c>{@c z6&KwjC|hdludZ6y(~76nF0O4?yS7DLslU359?vdy&u(eER@hn>Q*==)v!Cz1_d84; z8T7dO$DTcNleypf{oL>Od%yR4?|bjQpEotPuJaj&;N%yV3gWg*P)LKWIMJdDAPr)f z7>|6em?Or3FOrz5Z%_oaDjo5SqydflK}l~K6-J@wDOfON4~ddqz7!m*YM6>j#gkq! z6*cdrZQ2i}T%T<(NXke-nP+K{?jO@!J&H|jjgqfjHIfeK_69X&Ju*jnS7^N}v>wx- zZjUL)lVU=@W!j#X)}oFQQ?FjH*7MS}N=-1OHYja+!;O8K_ByoQ;h;1lyp-Ev%I*CL z^e8T$H1RNR)a~UPhr>FaOliC+EsWb;OY0ZLV>NL*kshoWT(-1kX?-Y_3@wuNNZ@c2z9@S0K^(24KuBzuw{@baq9eVEzZ_RrzHc;|N>^mewev=N# zP$GNUMafgS96yp#@wi#ygi(6M`iFj18a>|#c;YBk>+;~#PH+_Yt~~PO|7h}8=8^ww z9{C&d$Um6}xAWjv=fRKW!SBt3zXJRM{LV_I+0&m)m?<~nxTY!0md$Nu%yO(AJLOu= z=C-DIGGT3wbj2-IH+qd3NktN-ov>XIjl|>0s7NIS96RB5iT(NM}w zI#wNn=>%Ifd*aEiNIbhASN?1TVM&eEo^l$fY0t#55v(_}6REv^?_TohV@a2cci`GL}0raS}6+w1qg zdN-!EUyQ}XBP>0`#va`#CecJhTBi4hy#Jb7rSJw1-k>LW*n_{X@eU8p8%3Z&*<<_6 z2R-E5!PWjv)f~vd zd2FcWi5y%X`lZ6b9DJNIA@k>R@X{Roa1K5`2QL&05hz5U5P?Dj3K97K7=b@joc}?1 z$6JB$?Zq!F6e4`0e|@TzAR4*lHucD&GK~MA3lC_mZwF<@bNEYd0GGr zACE8(@8(GF)(Yt#_#cGG+cW&59{$@N{s9kvmxtf&;cxZuf9m1WxqRL4TXw#3ZFuLe z!#n={M91dlx`TBugm6hbsK zV>3r9W^E_AXElZVFQ{_&l2?h$KONqABK-8*tHV#9@`sH>;a5((lVIQ@TuvYZfy~iv z85h>4@!P(Nx+~HPI>S3wy-pV%!aLt^%fh#>`Yo7a_nppUj>Qn{L&g1&GrkDjZ2P2t zd>bmM&T1i_vA;hwvx5soj-d5_2Sr;SurOT92@LPcdwX}LIL-*faG49|5UA99BOT< z6K*Ifs8lPAnf^;~+ArvmLC1e)GJ~KeK=*=*6PXO1bnXG&0y^|gCUY2c{%|I90(3oS zrM`JV_i8B_*KZWYV5Kp$tR%1pb%fJ13G-lME(xqHk+-g9qgapM;kR{L<(2Eor+vPn zbX#D%SUqFK#fxWOK;oo-4Sr$xBozrnBHYEV0r(yQbfUKIz;7SgJ3v@-dF7w`n#Pau z?T1Iomq!5Cp)cp>)_2Nk>FdkD7ovV%ZoR4M2`Z#Q1PT!-M4%9XLIesCC`6zTfkFfd z5jb}Qvd^9A=gIiFGJcLspGJsbZHD=b=F8Z;J_*20E}0ti^K3!Q^K)L+nx|Wzl%^|^ zeI9P6=J}jrhUWSCH#(1?#P#o<&LoLHpdS+BXTlnEfuFrPqU-rYEvP%f&rVGvBS@1K z$>v&34C?2q_}Q-`T7u4ZDDg9AQxz-1x`T`#(2j7w@73+|vtHc3JokYOmisWH`Ui0d z;SQec?{;kucPu5Z&C#i$`Eux zbg-^2R3ECXsY^???#4xnLbdhtH7rkc)imQtFw&bl;6t%6WMx$bjO^25Ei~x{+_W}! zl*VCV@&~9jc1Pv1BPs)r!JMx~akvr?=qG+*6&Zm$V#;&GFD~tt zyv!=6J__o)+9=(I21QZj1IUl~NTc#)^x+q>IPElf^Gko}I^Z`|+yl-(W#SjW2xIyi z*itL~^8;7H_=LYec5%fCvNeGo?_VH$7ntEjdW8S8k|`TI4TeK+K}5DW_8G7X*TD+0 z#@H5^s7+|rH}=N>>pqV@Q*CS!Gq<2Sy1@^GW2w8=?B!t#Mi)HQgI(#=mI|-6~$B#GnWJM zH&xt)p3rDdq^h-3S~R3*_l~Vc$j2GQ0hwTI3pAqvl$$aE*sRLCHMg`wBJsP09S7?>LF=mCTF2xm2@HGB5f5i5&fwWL^%?&b#O# z8H&eu{h%?>Ne1^zzI^G?W>M4x)qTbT6qdh|VG%{c@cLT@9gh6p))1_S{l>EdJ*=VM zLh(BWHRk`WhHAk)YEYp3-;uAeZ7fC!d>c=@f$=ok#?!`aT#M}d@kQvaaR>4DQT7mJk5hJ#vY%7- zI%RKCc9JrKoGYhnCS~&|TSD0y%GxQrhBBM7A!M`0R7^-gGf1znjF~#F7)_4#XICV?02Md>A_%S&6TR!%>!6!BakR#Y386fduyFbxt#%MC~s*!OaWA${hOI4+b@|DwL zRb;%BjMrm*RTdD^t?EhS#P|wPS{XyVB+oiW_@QO|(u3pI*a4h-a zPG4JOup{F3wpxiEx0fvEHQZF++;Lg|b)v5=8B52l&d!##WF35Da8tU!KjI8&E9)(H zgEi>3B=8zgI_lzGo{p&KYlOS&<(obmt=RQ=!6@6AyiL~CZEjp|c5b+AL;K|$nh}o( zUIl8k6P9Y2EH=`MN0E(oPp_-($qu5oF$&?iI#gFn)|)yvZEkOaak3`E*lKmVBV3eK zEeY3&w0B)+MY$4fxUvni-|8Q5lHEBrq(dW&TyDqXohfTwB;|G{w%Lgo)cZfqMIH0& zAUo(ph6Ezp;aFSkWI8o6x?Ra+976PxlX@dd&H_OlT-)5(+S=YE`Zijw?N~AIZ0c)v zoTSrcrBaa|YfZYl+j3-h@eWu|kPZgucND{%jSqU!*p2tZHbs*IdJt(2Z0>ceh_bGR zr+E-UcqOZ&3G+{+6DhkVVa0;c-iV{DH9F{1Q!?3Svl(imDVcEXo;12hCA__5IqmIh z)Zj{=WR^8Y9-3zFHtIUXM{hHjapxNQcgVi(KfW^{v301oG|WbC!qqE-6@C^ZM&&nyqFjC z#1es2yA{M0(Y2hYH9#A&=sV*w##=Rm%|Z@x*oq9OCeCbB=Zq|-Huh08;A#`5txo&6 z(TZDLZ`#{}v!dlg1VVkg{U?Wbnca?h0H(1jXtZ82qtVy}CRXvHc)UWC8XlV-8 zbSIKEU1>Y++6l#2gL+I;ig#SsMk(2EyEWZ(ps5*1%Ap6CmV2ujmtF@Y@vU~s?y}>y zJG83Tj>Rl((jJJ^*wJnXM7wLSH*~S1#H3l(osP!=Mp8qGXs?q@V7spInoQ9Fj+K%Q zx&T3Rr(0D-JjGD7_sGH8r&t z((XcLralygq5}hBAw?;+uqPU=sjI8G!rHt%ekF72L|#iE=H}9R{hl?VeYWwBZ~oj8 zSy#n@e0uOEdw=oU_DJKD08d~K}VCPb4#D76npe{8??C8BmAHTr?nK}e7%z8TTn2> zOtH5?A>{KyW!Z_D^?DBzNX;P-MF-;cs*{y&iie+oF+=j*F{ zo$n=$^L5yH;3?&c&+Di&=qU?Y6nfHV{v4AsRoU+ou*!!h7`Xb%*EJcRfIt~yn($uN zqm#s-z}46WmQ-{GaDC}`M8)4oyfXX!jk&tEI*)t}@ZbmwilGY+6MYi|Ur%O{HNYqM zCJR0r!+Kq&%SoPva_F_dgE>4Gug@dD1Na28q?K@fE7P68HKY+m9syoCN_?IKKFUxD zx`s2sHzPYg+1OEy2O1brlr9%l=FDfVeP81bd*n|8A1w~ke52#Fz{&r;9y@fBJDQyv z^4M?9BX8%ycS*cbwCG0Y8Udw?$i5GE7qDrMNSmULTNg^w!nVsc>4F}Bl#4Ygo}D`L z!2;9mH>32)0y@cwCH40N%ox5mkTN6bL4n=kK-_YzSg3Y%3Ho+`ZANH4YT|^;84~g+ zXvT0}GX#|!PA;!Uh{~O6?oouE>@rj7E_1C}TU)z)xrtVZZP6@!17Y39#0hBv0u>D295HV5yL@q9Wwt zY+Quoj0ibN8KjWat7mm@4Az)~lDaaWX1FrR=)pFrSv%2cVL(bNE_~#bpGPiv(jrI3sompN5$UCG28wOjc={UY{`K z`;59;%5s_Lh4|qDRaQUWcZx`s5+9!wCj?#7%+>dPuV^+4YRbnShG~71tIz8zra>0e z)T@6v@b_?Uko!*;tSK?&_HpG=Ca?bypt!J>?LXgNW!g)@MIs-m$ohPr(*{OOa{CRq zv`2|)Q0psH+2AtKFX2b`K3SjdcQWPm75mR}OmBof-AiSj@24`YrACmv{wq|7-6&8@ zS)cFkGUfZb+`hN}_h|h!+92OAW_m>9q)%5Bz54rrQHa=n_Bg4Jmo&d~lXv{@M`f;l zQ0p<>o2!g6ABLIk_vjzg6--%<%`ngOfJgtZu3)-{6*Xl&rcZkG`8#h+f9aV&tjA{Q z4p?sgd40*0pJQNoZ~UIq`aJ$3s3kQ$tXU=+W>)+hC2F7b4{HIYYqOH7;BEh30O#sY zt5$MM>Aq%e;`zha8z@jr^k$!3NAvf^yzRrS;eG#L-4rpQ)Wk|Dpz^u!;rq=u@?)i&C<;Q7LTK8~kjrbRUZ6 m10O%>x(d%7rJ?d;&5C}-W8C8o&TU86-xgN-LmmYWEB+hAz@k(D literal 0 HcmV?d00001 diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a710f2531a..7ff651d190 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -19,6 +19,8 @@ #include "tcommon.h" #include "tref.h" +#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "" + typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; @@ -233,15 +235,28 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } -int32_t remoteChkp_readMetaData(char* path, SArray* list) { - int32_t cap = strlen(path); - char* metaPath = taosMemoryCalloc(1, cap + 32); +typedef struct { + char pCurrName[24]; + int64_t currChkptId; + + char pManifestName[24]; + int64_t manifestChkptId; + + char processName[24]; + int64_t processId; +} SSChkpMetaOnS3; + +int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { + int32_t cap = strlen(path) + 32; + + char* metaPath = taosMemoryCalloc(1, cap); if (metaPath == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META") >= (cap + 32)) { + int32_t n = sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META"); + if (n <= 0 || n >= (cap - 1)) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(metaPath); return -1; @@ -254,23 +269,23 @@ int32_t remoteChkp_readMetaData(char* path, SArray* list) { return -1; } - char buf[128] = {0}; + char buf[256] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { + terrno = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(metaPath); taosCloseFile(&pFile); return -1; } - int32_t len = strlen(buf); - for (int i = 0; i < len; i++) { - if (buf[i] == '\n') { - char* item = taosMemoryCalloc(1, i + 1); - memcpy(item, buf, i); - taosArrayPush(list, &item); - item = taosMemoryCalloc(1, len - i); - memcpy(item, buf + i + 1, len - i - 1); - taosArrayPush(list, &item); - } + SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3)); + n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId, + p->processName, &p->processId); + if (n != 6) { + terrno = TSDB_CODE_INVALID_MSG; + taosMemoryFree(p); + taosMemoryFree(metaPath); + taosCloseFile(&pFile); + return -1; } taosCloseFile(&pFile); @@ -291,7 +306,7 @@ int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) { } return valid; } -int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { +int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) { int32_t complete = 1; int32_t len = strlen(path) + 32; char* src = taosMemoryCalloc(1, len); @@ -301,33 +316,38 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { return -1; } + if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + int8_t count = 0; - for (int i = 0; i < taosArrayGetSize(list); i++) { - char* p = taosArrayGetP(list, i); - sprintf(src, "%s%s%s", path, TD_DIRSEP, p); + // for (int i = 0; i < taosArrayGetSize(list); i++) { + // char* p = taosArrayGetP(list, i); + // sprintf(src, "%s%s%s", path, TD_DIRSEP, p); - // check file exist - if (taosStatFile(src, NULL, NULL, NULL) != 0) { - complete = 0; - break; - } + // // check file exist + // if (taosStatFile(src, NULL, NULL, NULL) != 0) { + // complete = 0; + // break; + // } - // check file name - char temp[64] = {0}; - if (remoteChkp_validMetaFile(p, temp, chkpId)) { - count++; - } + // // check file name + // char temp[64] = {0}; + // if (remoteChkp_validMetaFile(p, temp, chkpId)) { + // count++; + // } - // rename file - sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp); - taosRenameFile(src, dst); + // // rename file + // sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp); + // taosRenameFile(src, dst); - memset(src, 0, len); - memset(dst, 0, len); - } - if (count != taosArrayGetSize(list)) { - complete = 0; - } + // memset(src, 0, len); + // memset(dst, 0, len); + // } + // if (count != taosArrayGetSize(list)) { + // complete = 0; + // } taosMemoryFree(src); taosMemoryFree(dst); @@ -385,12 +405,14 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId if (taosIsDir(tmp)) taosRemoveDir(tmp); if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp); - SArray* list = taosArrayInit(2, sizeof(void*)); - code = remoteChkp_readMetaData(chkpPath, list); + // SArray* list = taosArrayInit(2, sizeof(void*)); + SSChkpMetaOnS3* pMeta; + code = remoteChkp_readMetaData(chkpPath, &pMeta); if (code == 0) { - code = remoteChkp_validAndCvtMeta(chkpPath, list, chkpId); + code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); } - taosArrayDestroyP(list, taosMemoryFree); + taosMemoryFree(pMeta); + // taosArrayDestroyP(list, taosMemoryFree); if (code == 0) { taosMkDir(defaultPath); @@ -1322,6 +1344,9 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) TdFilePtr pFile = NULL; int32_t code = -1; + char buf[256] = {0}; + int32_t nBytes = 0; + int32_t len = strlen(pChkpIdDir); if (len == 0) { terrno = TSDB_CODE_INVALID_PARA; @@ -1336,7 +1361,8 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) goto _EXIT; } - if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) <= 0) { + nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP); + if (nBytes != strlen(pDst)) { code = -1; stError("failed to build dst to load extra info, dir:%s", pChkpIdDir); goto _EXIT; @@ -1349,7 +1375,6 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) goto _EXIT; } - char buf[256] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { terrno = TAOS_SYSTEM_ERROR(errno); stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); @@ -1368,8 +1393,12 @@ _EXIT: return code; } int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { + int32_t code = -1; + TdFilePtr pFile = NULL; - int32_t code = -1; + + char buf[256] = {0}; + int32_t nBytes = 0; int32_t len = strlen(pChkpIdDir); if (len == 0) { @@ -1385,7 +1414,8 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { goto _EXIT; } - if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) < 0) { + nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP); + if (nBytes != strlen(pDst)) { stError("failed to build dst to add extra info, dir:%s", pChkpIdDir); goto _EXIT; } @@ -1397,15 +1427,14 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { goto _EXIT; } - char buf[256] = {0}; - int n = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId); - if (n <= 0 || n >= sizeof(buf)) { + nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId); + if (nBytes != strlen(buf)) { code = -1; stError("failed to build content to add extra info, dir:%s", pChkpIdDir); goto _EXIT; } - if (taosWriteFile(pFile, buf, strlen(buf)) <= 0) { + if (nBytes != taosWriteFile(pFile, buf, nBytes)) { terrno = TAOS_SYSTEM_ERROR(errno); stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); goto _EXIT; @@ -2430,18 +2459,27 @@ void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { int32_t code = -1; int64_t refId = pDb->refId; + int32_t nBytes = 0; if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { return -1; } - char* buf = taosMemoryCalloc(1, strlen(pDb->path) + 128); + int32_t cap = strlen(pDb->path) + 128; + + char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - sprintf(buf, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); + nBytes = + snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); + if (nBytes != strlen(buf)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + if (taosIsDir(buf)) { code = 0; *path = buf; @@ -4473,8 +4511,18 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { goto _ERROR; } - sprintf(srcDir, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId); - sprintf(dstDir, "%s", dname); + int nBytes = snprintf(srcDir, len, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", p->curChkpId); + if (nBytes != strlen(srcBuf)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstDir, len, "%s", dname); + if (nBytes != strlen(dstBuf)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } if (!taosDirExist(srcDir)) { stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); @@ -4540,14 +4588,20 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(terrno)); goto _ERROR; } - // META_ON_S3 - // current_checkpointID - // manifest_checkpointID - // processVer_processID - char content[128] = {0}; - snprintf(content, sizeof(content), "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "", p->pCurrent, p->curChkpId, - p->pManifest, p->curChkpId, "processVer", processId); - if (taosWriteFile(pFile, content, strlen(content)) <= 0) { + + char content[256] = {0}; + nBytes = snprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId, + "processVer", processId); + if (nBytes != strlen(content)) { + terrno = TSDB_CODE_INVALID_MSG; + stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir); + taosCloseFile(&pFile); + code = -1; + goto _ERROR; + } + + nBytes = taosWriteFile(pFile, content, strlen(content)); + if (nBytes != strlen(content)) { terrno = errno; stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(terrno)); taosCloseFile(&pFile); @@ -4612,17 +4666,28 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId); SDbChkp* p = dbChkpCreate(path, chkpId); - taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)); + if (p == NULL) { + taosMemoryFree(path); + taosThreadRwlockUnlock(&bm->rwLock); + return -1; + } + + if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) { + dbChkpDestroy(p); + taosMemoryFree(path); + taosThreadRwlockUnlock(&bm->rwLock); + return -1; + } pChkp = p; - code = dbChkpDumpTo(pChkp, dname, list); taosThreadRwlockUnlock(&bm->rwLock); return code; - } + } else { + code = dbChkpGetDelta(pChkp, chkpId, NULL); - code = dbChkpGetDelta(pChkp, chkpId, NULL); - code = dbChkpDumpTo(pChkp, dname, list); + if (code == 0) code = dbChkpDumpTo(pChkp, dname, list); + } taosThreadRwlockUnlock(&bm->rwLock); return code; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 26df7b1627..bc3762a6d5 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -527,27 +527,41 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { - char buf[128] = {0}; + TdFilePtr pFile = NULL; + int32_t cap = strlen(path) + 32; + char buf[128] = {0}; + int32_t code = 0; - char* file = taosMemoryCalloc(1, strlen(path) + 32); - sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); + char* filePath = taosMemoryCalloc(1, cap); + if (filePath == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - int32_t code = downloadCheckpointDataByName(id, "META", file); + int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP"); + if (nBytes != strlen(filePath)) { + taosMemoryFree(filePath); + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + + code = downloadCheckpointDataByName(id, "META", filePath); if (code != 0) { - stDebug("%s chkp failed to download meta file:%s", id, file); - taosMemoryFree(file); + stDebug("%s chkp failed to download meta file:%s", id, filePath); + taosMemoryFree(filePath); return code; } - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); + pFile = taosOpenFile(filePath, TD_FILE_READ); if (pFile == NULL) { - stError("%s failed to open meta file:%s for checkpoint", id, file); - code = -1; - return code; + terrno = TAOS_SYSTEM_ERROR(errno); + stError("%s failed to open meta file:%s for checkpoint", id, filePath); + taosMemoryFree(filePath); + return -1; } if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { - stError("%s failed to read meta file:%s for checkpoint", id, file); + stError("%s failed to read meta file:%s for checkpoint", id, filePath); code = -1; } else { int32_t len = strnlen(buf, tListLen(buf)); @@ -565,27 +579,33 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l } taosCloseFile(&pFile); - taosRemoveFile(file); - taosMemoryFree(file); + taosRemoveFile(filePath); + taosMemoryFree(filePath); return code; } int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { char* path = NULL; int32_t code = 0; - SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); - int64_t now = taosGetTimestampMs(); SStreamMeta* pMeta = pTask->pMeta; const char* idStr = pTask->id.idStr; + int64_t now = taosGetTimestampMs(); + + SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); + if (toDelFiles == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, pTask->id.idStr)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId); + stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(terrno)); } if (type == DATA_UPLOAD_S3) { if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId); + stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId, + tstrerror(terrno)); } } @@ -594,7 +614,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d if (code == TSDB_CODE_SUCCESS) { stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); } else { - stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path); + stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path); } } diff --git a/t.c b/t.c new file mode 100644 index 0000000000..a79ed4c134 --- /dev/null +++ b/t.c @@ -0,0 +1,12 @@ +#include +#include +#include + +int main() { + char *buf = calloc(1, 4); + int n = snprintf(buf, 4, "size"); + + printf("write size:%d \t buf:%s \t len:%d\n", n, buf, (int)(strlen(buf))); + buf[4] = 10; + return 1; +}