From 6e7ba34c9459a4ee21857c7dc9faf4bbb18d7ff9 Mon Sep 17 00:00:00 2001 From: Dinindu Senanayake Date: Tue, 15 Oct 2024 17:05:21 +1300 Subject: [PATCH] fixing bugs related to snakemake profile --- 08-snakemake/profiles/slurm/CookieCutter.py | 39 ++ .../__pycache__/CookieCutter.cpython-311.pyc | Bin 0 -> 2190 bytes .../__pycache__/slurm_utils.cpython-311.pyc | Bin 0 -> 22966 bytes 08-snakemake/profiles/slurm/config.yaml | 29 ++ 08-snakemake/profiles/slurm/settings.json | 5 + .../profiles/slurm/slurm-jobscript.sh | 3 + 08-snakemake/profiles/slurm/slurm-sidecar.py | 330 ++++++++++++++ 08-snakemake/profiles/slurm/slurm-status.py | 106 +++++ 08-snakemake/profiles/slurm/slurm-submit.py | 105 +++++ 08-snakemake/profiles/slurm/slurm_utils.py | 403 ++++++++++++++++++ 08-snakemake/run_snakefile.sh | 4 +- submit.sh | 25 +- 12 files changed, 1036 insertions(+), 13 deletions(-) create mode 100644 08-snakemake/profiles/slurm/CookieCutter.py create mode 100644 08-snakemake/profiles/slurm/__pycache__/CookieCutter.cpython-311.pyc create mode 100644 08-snakemake/profiles/slurm/__pycache__/slurm_utils.cpython-311.pyc create mode 100644 08-snakemake/profiles/slurm/config.yaml create mode 100644 08-snakemake/profiles/slurm/settings.json create mode 100755 08-snakemake/profiles/slurm/slurm-jobscript.sh create mode 100755 08-snakemake/profiles/slurm/slurm-sidecar.py create mode 100755 08-snakemake/profiles/slurm/slurm-status.py create mode 100755 08-snakemake/profiles/slurm/slurm-submit.py create mode 100644 08-snakemake/profiles/slurm/slurm_utils.py diff --git a/08-snakemake/profiles/slurm/CookieCutter.py b/08-snakemake/profiles/slurm/CookieCutter.py new file mode 100644 index 0000000..cfa9bb8 --- /dev/null +++ b/08-snakemake/profiles/slurm/CookieCutter.py @@ -0,0 +1,39 @@ +# +# Based on lsf CookieCutter.py +# +import os +import json + +d = os.path.dirname(__file__) +with open(os.path.join(d, "settings.json")) as fh: + settings = json.load(fh) + + +def from_entry_or_env(values, key): + """Return value from ``values`` and override with environment variables.""" + if key in os.environ: + return os.environ[key] + else: + return values[key] + + +class CookieCutter: + + SBATCH_DEFAULTS = from_entry_or_env(settings, "SBATCH_DEFAULTS") + CLUSTER_NAME = from_entry_or_env(settings, "CLUSTER_NAME") + CLUSTER_CONFIG = from_entry_or_env(settings, "CLUSTER_CONFIG") + + @staticmethod + def get_cluster_option() -> str: + cluster = CookieCutter.CLUSTER_NAME + if cluster != "": + return f"--cluster={cluster}" + return "" + + @staticmethod + def get_cluster_logpath() -> str: + return "logs/slurm/%r/%j" + + @staticmethod + def get_cluster_jobname() -> str: + return "%r_%w" diff --git a/08-snakemake/profiles/slurm/__pycache__/CookieCutter.cpython-311.pyc b/08-snakemake/profiles/slurm/__pycache__/CookieCutter.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..59a2d99f3e15a4acef3c2d4385eaca07210c8603 GIT binary patch literal 2190 zcma)6-A@}w5Z^tY?ZcQj*u|u&NKL{=9TKcm?MqdV_)aBiBp^~_RXPl}hBNk^vwMc% zLTgn>rID-h22xdtCrZj65%C`w$q&{^m5{1F^rbQ-l~A5Kd;Y)%8g=*Xx3e?9otvGV z+h1B*TnO4fpZpzrWkcw9KB+cGMK}{MLOV!73QnL%dBhP+P+X`|_h9YSM#Ozsc?fU0 zMeG71j)ry4s;zz`c173+--k8-jeb5x5eYoucvE#1ks28hmx6h3^n#Jrd=EWBOk5S7 z+n%CTjL}nsp)0mc6loUF1X9FJq}UtRMcfLWK|#l#{Bz+<3nPZ1YKyEt&U9^|79<$C z%u{hT4X}gOu>oVX#cPRJLDYM_WgWj;rK1_N4j$^~L7W+zCPtcSft6??O#(5hCj)bH z)`ZQ?1)`b~&{qhhszL&*s<9Lx+KNhbElD&3)=)J%pCGJ1D42rIOlQRkQ%Bi{XoE|Vm@A(lwfWN1X! zmsK*7HVi`hQ)^}`4^xH+=$fok5LPHZIXAql8z75LoUR|{bMd_M7CgUsT63A*d%17& z(w!4e+x9~4tGsmk42WofL$!$&;p|rsb`U`kte}Vhm0=^|<+HbHkLxd0R>V=^+b*5C zlSqn4nqq=t-xT-Z3y0P3qKK$Idsqpw`6Q+Y&70{51THSqQrQ-V9+JJge@Ur!*QuxKz z(NE}SVFq*%lr3zcjO*dUg+!Xcu?++T%Fl=100?5!Sy+9-89r4e~G9c7?y)_xW4*mfS!sPOPRob##>=@f)mEK z3q|Vb%zHZjcS!bbDtE7zqj*~=aeZEkCdpeusWAAj!uW;BL-B8|T9Jf^~0G`_2!~2;Kn*z>H*1gGb3uK3rS)u>56G%2Bq(CMmq<7 z16@jHxi)x#a2k6BkgfF-e{bI3dm!aM969!n7W|{xsiLnd@9Wz8;^6CJ-(bNvn4K)P z_2k=n_MYb-4IQ@)7utri;S+mnNf7PcJzELEZ~><_82OsAq!o=BeRm};dLE2gNY2r%Nh zs+mG;$rKZMRG}TP*h+UfNsUMUM*6gTK!Y?0JYLQ0FMyH_V_cFDc4eD*FQVqG)lWpn z=EQeX-%f2mJ{CI)Vn-Gi?M>O)t-HD2A0NDUu>EW|bnLocaNR$)_Z94Yuut4^K#QJR zTSjhe*ZciOZX^4cyYH|}7Mya@YYrE>>!kDZ^Xt?l$&TMEp&H!uVaViFIOV$6*BF== VwtVDl9qO_jb%{eCIFI@;;J?Xo{xARl literal 0 HcmV?d00001 diff --git a/08-snakemake/profiles/slurm/__pycache__/slurm_utils.cpython-311.pyc b/08-snakemake/profiles/slurm/__pycache__/slurm_utils.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..9b4aa8414e1f4abee5b2f56cc712450058ff5a5c GIT binary patch literal 22966 zcmch9dvFwYerNYg&-*EjkOWB3NJ1b45}4N-Fj&07Sbzb0E!(Rd)C~-bX5{V>kMU@C z;&8VkPP-O9Txd7;MY-GE;Mhr&yX4}kTwQscO)7O)mzq}f7E^su9aX8V+WjL}^09lj z<)l*g`TlyQXL=-9=dLO}J>UNI@72H8_xbyF|86Mc=WzUT&;Ppg2U|JrZ|SCTc*}vu zdmJ2hpOZL=PjW-%l^^2Sy<^D1uFfGRy9z@tc6ATAadjj;quwDePi34*->84c&%#17 zFd7&Nval-|8VwJHS=gP7j7Eo|EX*exh8j@LlWZJ~4aHcTH`z4WG}OexzT}G0#-T?J<^KX{7{$FjJx$x3t+djNoqyh25BYkHcG1id!*HXn;e|%yxLnC zLaFVCJjZ>6zj@T9wm&s>%~Mm?J~dT*YU(Sg!PW~l?M-{RnoaZ=}PPU@2Te&|HY z@i&j5ewMZ#Y281>diV(ca>`bgvH>X@>r%E!{5h_t_ve&xu!lE%nZ&3p$;nI{Hy$aW zq~fD8f&#_k>bj6hq*D|aA5Ta-5b&PMD2deN6Q>al9!jULCFDcnnT)L9-jPTHhF(c4 z*DfW~H;yPunxcKLB$CodT#;1VxRdG2ms#!=1IZilTk6^IbVinrrj^lnxthq*sD^X$ zo8xk7M7HvcSW}3u5Mt24lo2CSla}K$*1&L_eK^z=bvXspQ9kTBl}N@Kc#g|)_USqY zM3fwoQxYWC`@#gD@lNm5+{V(@vjX(v97x3+H1}USC|;u$%vV>_~OCK|M21G^DkaJbKvK+>@Kr5KmX42 zERet5H_#&({0j#EqQM_C`14u+#@D6Zo{eu!^mG|~ZZO+1o>Irh#?lI~pz`SQb4TJC zV4t`lyLQ7~QH&>nQ^=!^U%HgIDXV+L*IvlIm^+p`|HjX0iT8vJe?m>9)JzG821m*F;t#_*^K8VHnAbbbN87e>9cpzdUwH?H`jB zHKAtuucSxiekqYkq@?lwK8)g}#N~c96~88r;@__($CXhN4v%LNNwsh6meDj^9_?ZF z$l*+Sc=V#O8IM_ozxp{qp8Kh%VY>T+PR+AM_pDjuc-QJ5Yg4pq!g5GaM3rqAAg<=t0i&fXomH>8Ke z{LzI#_}l9Aw)?x^-95AI?w)&lCcioL&DmxxvOx=M)B_uf!p6twG&e$^auAha6mSZG z`&=zuyJ}HGiT}6`D<**Zvo-XwQXTLd)a+xIoeJ`lyM_){bK7ZBmF5Ab+}ZsvV#Y_~ zN%7FB6NEAfXn8`B`_rl9EipbaB9CQ6tlP^;SsY8N30m{W99J%nkIJb`U$*_+sS7W? zC|c_T7;tekfifdXVk`rm5|CswjFN`$YWkvim)&@-vb01i(!^xkiepL|>oSv&)jrfA zT#=Ks#Jza#SX@zMBRo)U!5O+$j9^@nhAXWy{45jQs|w^a4ps1DDh;)$khEb$%na6p zmEDNlgul8U087>z)xB$rn_nn;Pifv$y7v?ok>L8);M;@KdyDH&=La?6j4qrh3TH}z z;5+;7?3>&_wOpy+;}3K8c|Xj(F+#jUa#ShzEVXROVK29Q9;uGQfCmJs73JzuC86AxRd~MZ zpZ5F|FS#dNIakh+6ZYW|Q&P@>mFvm2Tu6<@N3MzI4i20@bbR>mk)s0_PMts3XB&DN zI%1WNzXEU{3?Rq7(K5k@NOXHxk}t)_lNq&dUot%sPpbP7sAS!V8q^MeJXdV&HP6|B zGJV;8ci+8zRy12y3O&-`wAyx9DB{i5qwm2;~o#SnnP*pgUS)uin_Z$T?k23>4g@VB_?* zsT1#1Ex1t+ZY=N%>$+xN{+-p|Up>dquiI8!x2%8JhfB;B5p-YZ*3P)Bsm5xg! z8iA=3nU@kH8NZ*VyCz{=F=d)^x+p`gk`?iik{%V~7u9rfJOeHehp?4az$lW)eO+di zpadxPB}g!+S?ObOMqs$I_J@aiyoMtUk%dYtYtb%+NRQzi9;QOW!-ikJITlYz*Fnw| zl4p$wgd>T>3$&Q>^f;8$6z`NqMrtw7h?yCa<0=%LaslN$vQqces{na!G01s?i=5zU zSco)C-FPQ=C-=dz9~{@(w&-nJeqZ_c#>4)B`Tl`VyR`medjByka$JubF9-|as2&z) zo3wDZ9_}tU3(iO3Xu(K=2=yw(+q~qO5WXSY;BRm@ov(5?cnA}JU}}J3oK^{D zBPAwMVwKOUnOjNOTpZ`GK!P8Y<0)0lT#08y(4re@D50V}dQp}nNYW^IQHhVpDufLg zt8FZa8%ewfsU&qv#5zgDFDA{^2P!s>e5@kM9KRkONHlt&2lG&ic+HGi4I9KO_}nJasLVzQKBtp$20$X5^chKJiRaj;pdrf-wXh zJd!>8R(f0nwNIqbP8LyVWuG{ZRMTP#%CwnK?inJ)aTU2C^;=zHQkSZVSK`-^J1t(6 zMMzNbF|+d+2pM>n0v2D9N5#P&j}pb&P{II4gPfE}EE_hp2+V*Ip~N+m=zT%H#k79I zH;68p1^~!Z!{xa${KIdOx<3qE2F&3GV-n6wqtH;~n^o0;2~ey?23|zfWoGb8KNo15 z9Vz-ZY5q;Re^dTY$l-rV8RLP;jeQ_TP;R|GCQjx(8q$kj19v*lB@P!axYt(`>C~Av?OKL zX5D9~pXI|&Jtl;lu)H~>whU;BMT-!&>iyVb=VA8#!4y$LT_6s6?YWhWU-H=&FZ|eN ze}esv=X_kIq0&#{w~YD;7WE?`s(gU@?U`_8R#fhxX!)ODlyP3Q#NAAF__>_R{&aia z>}iaGy1wDM!DmQWE+5OZA@I-928)UZvzsj{P68w-4YRLQXqjrt14LG#KJ|@M8cx@W zGW)m*H}^T2BfB?j-oPGMyh&_#e$j5yrr{eEyJJ-@KAFB~LqwWlCcKkU24M|4%^NEx z9JTs$j-g$yMyIhOCeKZ+lDKo+hdfzSJ*u1mV^VsYhM>yHOGX%77_RD?hLa2~w3013 z7!nE;|Nk+rNqiJcJNlC$B(CB#5Oi>be%7>h(81y1+I5C3CMT9Ql zuXY0Dt>2=<;X_N70<(OI%9D-|A0RHB%75(#WQ~NqGeYP3qwl1vlBX z7Dq7`3$sbl`Vilm*n@voJdu**n~diyp92OmclzR0c_gEddC~}%c^J48HOXkG+*_?N z1Ztn*9v_q98JN(n#8sMG)*zEV4N+qWPYoBEl8}@E3j3=~A+^X{;|fii!Un;Kycl$A zy(_~=TB09C>sN#F4P+y1P=60-0Fl$}bYZ-gf^T2?oML}bO#;>u@f#ch7}Cxv6hGUTi<1wV%-2PY}yo1v}ZJ_H`^+@-$ATik>c<3u4dQ{!e@L z?Sn<}v?iX`#nTmz`$+6!!G(qucbo4u=TDY|@Izt6ys+XuX|_}E*zu`TfBr(T`DLy7 zWgSp>MHgNv3a>19!UYwUG|y_?vzlym{UvYXL+{FY@5&FlW)FS;cyZMhZPgYX(A%eb z`-TR!T;UPUdRP+u}9Y8r$fPmVCIs5aD;Gur*&-?vHb~^w3 z*`0^`oImahA^hXLJCAI2{>4@o!Zy<6Mj@gTkH3S-COvW%H%pH?zUr{a0gmN$OnVtS z&j11Z&gWnr*hI&;#?3_z^30$<6Zwruh(2nZ9 z7Nj31GtZobMnS@8`2auLI^mL@w7FrEh3Z=8uHEFwxxrg)PXzzuCK&&$@UE$Mm}y8D zG@P)7XSW@N8bG3Wn&fL3(Zvx{Nv!C>kd(2NfV48D>SMH{$8QKDX&AQ!GSe8YL`EJ} z8R=k#ZH3r9Lw1GH7m}?Rk?HXmexgWZAysZMf@1j02V?g@xq^I#i>WFG4{Ob+dRZj~ z2TntWX_pL@sfwx5D3eJ44g6Ko9l=kctL{hMjcCzzdURd>#6t6`nP-3d!1oSlt2gVb zH*3vX^yV##oZq#Xfk{6!(+f~br`O-vSJ=1E)bf+o%?~bUt=sk1?OM|gy=h0`2vBKS z{@|PzSf>ZpEpi^;=0%4)w0U9Owz>76tkKpT*Vi4N_RVzO4Nr%EwGeBA4jn@E6;NtE z^|1NeeDk?dQ^)Ksy=fCH>Mf11sJApuJ7G?50f~6fpf&gE&Ar8FFGD;Z3KIf)=wVSV zxS}XDsbtUk(e~IukK;3s{}Au`tfBYdKG$daTnO6`kA{pP-bpNmZ+X7uz0Z-&Gsowd z0pv-DC_&}5FP}SJ7y*I2OAk8&0_8Rk%H8q6din&EEqDsvOAg6#+naON+gnQ%SX;_Z-?Y7lQW0Jr93F< ztCl3ob);LPq3C=XLH6-II<;fjkz$PC%aVJhQJn&xaz*%Z{{Gf8Z;GHb)5@v$+eD~2aMMnqUuPEt8P>&h_V zRKp2qIIdl4oW0K1;eo`p#g>5BoA+d)+&0&vckU~ArX!kHeAKY&e*3%aGdCVM zw1!^2p*Me$sQc9)bZUWaJmBWtzDIz0kL zAh>oi0Apt;`p&C&UY%BF&lXV&}J!Y#&0rqTeJgeX6-Io)krZ*^({H-jW%j?t&mn@Hif7HV~ve| zzzvF&T35FJ2>b+K2277<#>O)cDUuQy&LLTxBeW9XS0j=U8}`kc5GwiKEhvp0Ra0n zzUXw*WcyTm{$R=DE4=jf&HT-hH(YQp1VU50-`Rg>|4fG#Xx9Vnv^+N~c>UPMIXK%? z^mc3BZr$6BKuPfDkD3#OI<12k2kBUkZv)T{8wlfjx zY{)4I+sondj$I}Hcadz83Pwqskjm=ppvH=_z*blC0ZRS}{wnE?c@Dbh386*E2rbr8>TlD4wXs6^u){n{_6C^^n~VJuY1=Q*;!g1 zcW9%DP02o{oYdM&BsXmhk@0K7;e%Q8GJK;h^H~(gIT+;3y9uWa3$W_Vqkd<1LP)KI zz;C6)r^>=Ii2fdu7a*uR?SeYw4Z`mwhh6fT5TJBWKg!2dOGd#WsKdnNy|JV^bKHbG z)r8iP!MuDtrHyW2wkO+h&h%#hGd3L;u>BxmJ0dfV-3?|{=z|S`?eu2-Tei@yDeXKA zW?jsch;czv3h*hkDh!7@W&}sB$RpQ=O=@8@9)iVPmd+??h#snGqIXO3NE$Sxjf%Kc zIj+D1Xg%Hdj@%@t3@|vD{>iyPp;1$4ts2geQOR(EAu8XZxLW{Abqv4hp)ricEB^sG zKBZj$1ps6s+*xE!H?t}8nm@Ma5M0qxq@{3m=G4r7E!^|qm=@kXcWdsN79J=#u|qbM zo*B`?9eTKfHprlyhxC9*WF%SzrEsL+VbVX;?kGwBZrD8D*>q>qAC7sA=+zqawfnSBy`9`}LjyYo1!5EBg#f|ED#gla={3Ep1$qsmsQDEgkE7 z!0T_-aoyr^m<0|Ao3({j2@8AA1+(=8eTpi^LVQ5vFDV$yXk-(o4_yizZSpnEZhhqTSM86EZ7dzs#0iP1L_5`aMr}8$6 zz)3cdM7j~6hnZrX%2ZF*d#0osKDMzzy1KH8$_Qx76_^XXUO#5DdSg2v&?h9Y~q5=)6xfrw18MHc{9?c;^B@4R;B zwaGW8-q3{Ay0E$^tp2GtP#7p&nmREVo(j)2!s&MABJ|M(KWdrwOkJJqpX#4EsQK4? z?&N)If6AhLQ%TLgTKBL1+~x4K!BQBAO!d5X2o{T;_k&u~dcA2q)Yv9?67=f+-p@Tw z*b1GTD+ouPU~KyAoxKJ8imiJl_f8>DBL`y?tpkI}j>lw=uHB9V?Qj@PCsGjT6ViE1 zH{PVkFu`lE$EjRJZsi(*Bmri}A%TKTp7LV~5%?AUDw(tL)~_UZ^M}p-G|Dl8zjahh zb&uUlD6j5~($1*tBvL&jAjF*C@m`QacCk3O8LS9Lb8HpeANiLZY?jgVI zmOZjp_Q`(9{l04`fVXFYS3{NO*BkwY!je~xz-Z~aEeu5^KMMyCZjgf5t!dbGDt`v> za~dz`WW1^n9vS=0-y+CIjsU7Sh-p63=?+kQLzz3}W!t#zy3y0suog$nqYc09MK zf-E8StW(dU{Fmw!SGA7Vxo};FV#7qT*t)YZae1K zUr5T;zLHe8!$!6e2z?G>cMy3i^aOVhO`Y~9mh-X=1(b1787g}dzbZE4Y1AWjNQ}m0 zK`%zI(@FUqc$=_B<@Qw!2N0rdGf}(SXK#h#$N@8Iy>BG%@`7{yeA*Y|+kB6IFcHN> znQWdB35(Y=a*^$Pi-|)Md)}(ZVOyV(a90+J_Rw8+9j!=m?1Nt*gn){BpULdS4R1|s zsHo~RND580avi{MONr}n!eFw6LXwB+U@$%C%(J66buzE7fu#VPMw@t|Nf>~<-7e_ zu!kZhPnKc_im|Wg1P|lL&5dLoOPSzvqGhvwbwed#m-hO)2Jd6+^;)px5_2g^0p~#XxHsX z)-U>;qZW1QUCWk#dalM9kVFMGMvkfP)i?_eZS~=uIc8yYtxhX=ud0PpI*|3o|?KzJBlZ z9~}Pv$p;y|YlpUChZcluK;g)vK+_C=XMbUTDYm+B`cX^U2N`|sPOW8UA-E7}olO=a z+qKAcJ+i&%-OliTu&1?7dN!hPV@s81&>Z$h7!WocN%&VVm6gRC!D0KINyp7J|1eQ^ zGwd6N`df*TWHVVgDimQ6HSa1GR zDYjxJsKvSvYKToIwT2FOU_~3JhwetOL)g$b{p#KJ{K-e5l_fe`N}*=J#eh5HhSPJ% zoj5He*m)YBy%;P|{=}I&igd&g61CSdo zbRVmOwFciOOSyK|hVoP&JF2>qVESM(oF&=8aN?^571il~B4VlP6fqlaDNhHmTGX}3 zioN+Y8wvtm{#}SQzc+sO#=RT#G96SXZvj7Z)%`UxE-&7nK?<45$P!>4%PWNv|CBC6 zT>l7v^B{}EAak6BsCXH1$~^)kTq-xxy5}R^vv2*$ z;XgY1$49luUOhtAprwMCrL`z{8oo@z&*HlDfcp}48ILiKFXNr~bZ4hAnDNnZVekun z>06oG@s!8M0bbyEHrQ{pv)y14^tf5G33U)F&tcavUgqxe*!Nw-zMSF6y55?2?yU)U zdNc7HBfSqbRCjP~LircIeV64F-a1qEXE)RjHfgXm#>+~^lQaiAQ0LF^gUe0Ri#R`& z9>nJwreC@LwRgWZJ23mEwxUO0(PO67LN+mQS577HwG6b4)+w87(+Q?-6&N9F=BY8+ zrsbM=jN0=Y>LGVy4vref?;JNDIVb&b>fx@F^Se%JyH4r5P9aundQk@qzNiOZ1o;oO zVpng`<@EWNt{4GT)?$1OBB-l>{TiVUZrJDP9IY%1=dmW7gUr{`2$3s0{36t0=1wjc z(S$n8jI6XjN$<5*vBmJP1o&~+kCl%KGk$({)=~S}7%{x*Lf?Idq0zMHvF?Yl zP4lr$4|dHR&|-)5*r8(Z5X7jd99~_0%BXw=wS77Auyh=#=YNm#Ol1AgQSDuQr&z9R zxIFFUo}Z`AkUR^Yf?6VQYi@%U*`r7H6uo<%((vd1pBpYY&Al;(&I;ATTv%8ylz*h# zx;at4KxDS1-2D1U{VMfCM2@SmS1nm8(#WaL1W>Skwwcg$tC~uXcRt2?-B=*s;-=St= zz3ha5eP?QY`}8 zC(3=7`&iB9r;x+}=H+qFd=@+%v9X5ly~rv2B@sJH8wQ?Z7U;f%=5<50<_oAfE-Kj4 z!&}oJ_V^Z!3jdB<6b9iJ6B69c>AN*bwahtcE3GatBuS(aaPJV)lDPgtHPd-yx{ylT zWSe1|#rUWru>&@L-;|9HP1EuvXhCD6saRev?;A_6J>tSopd~1)^ z+N-zX69l}Eaf7bO1V)j9!5In zBb~FSwa5-VvZLtTv6MVp!scJHfdN6m)jX6PxWl8&WBDOiDmu*4P%k;^t$UJCHEYA< zY}vDDtKio^(@q5KcVH2&Y>b?uZPWx($St(Wfw&8=VGQM<6Rd+wkX+pWiTKa4#$AA9bT5iNF5j~)DUn;tt- z3?5+{8CB6{w-(#3$6%*<;*9+>EoH@r-YvV1;T#L3zhYm<<;dmNaXHai$L0HffhQ;- z0$VVMHhd-CNgaLM3}EN75_5mVG&=^fLLYqcu&0F&B`%pTyq`Fp!IQF~@|>Bs-$vHQ z<~A@eWGXjvNsDw9y}jgA!I@ennZXQi z`CZprrs6SxIFr&d!vFV_MYxzwC(TvNh&)jj1s|U*^943CE9kbBz$n!~k2S5Je@7t# z`;n;rBS7Bz;RRHeI17S{OFi&%?(m|6ckP>jZ3vetAhS{U_E%XVbR5bixcoCJxzB>K zSAdm`B7!WN)73V!ot)nB(ozLj*(kzae&}A)PR71OW&$f4@7h~_DDT=&ZO5ewYO?LO zvUSr#d2;QofSPRGi|!^@1DWh_d0<{xxCU%y&2qVRQnPTW0;?j54VTAa=d{W^^{c?j zMo#?ZSbWLac-Q(FCpDbjTP_2uHi|GO;MZoe3Y$xV7DNw!)Z8L}MtO!7$)^Mc3A6#= zEsgPuSexVktC00f>7-b1Wgmf=Xz^kPJ~~H~LZBSPX%r`xnBj(=2=6Xt%*Dn6By@$O zScNnM_Awd7OCiQVeHIlrnyg@-T=A(@&G&_a)ucq)d?C4^8U+-*OwS9URwGi4XP?Dm zU*a%7yAh?Q(#P=F=f&8!lh}u8%+14ZP{Jeuda;Cg>@yEx=9Ft1vHvqAd`4i7z>f(K zUu7s(hkIXm_nEJW?^o7iP;heUKY;AvMOU7G#6|M#SK7wzYA74{W5+qqmb zx{93bEOFb4_OrzG7VT$=+fuZji|!ELv&dOM6nKyV3xvHqJd!QY9O5ICxePiQdC)04 z*u09T*OqJGz7UUhnk^u<@IZb$*x}&=5M?W%$xB%*&>ZIRtrZJ&;S(tNgSSr>BQI&f zIbAr1*S*dA0Zlll3kR+Ht(ve+7q(gVhc)4dE*!D$cWS~eUD##acWJ_UU04svI^xI& z7MoU4Gc2%&=Y60ZOTf&tpHu8Fs!84hJZSw=@Hxf(qMC5nae?PSkCum@v-n>;A(fGz F{|j)FC%6Cr literal 0 HcmV?d00001 diff --git a/08-snakemake/profiles/slurm/config.yaml b/08-snakemake/profiles/slurm/config.yaml new file mode 100644 index 0000000..a21ef0e --- /dev/null +++ b/08-snakemake/profiles/slurm/config.yaml @@ -0,0 +1,29 @@ + +cluster-sidecar: "slurm-sidecar.py" +cluster-cancel: "scancel" +restart-times: "2" +jobscript: "slurm-jobscript.sh" +cluster: "slurm-submit.py" +cluster-status: "slurm-status.py" +max-jobs-per-second: "10" +max-status-checks-per-second: "10" +local-cores: 1 +latency-wait: "5" +use-conda: "False" +use-singularity: "False" +jobs: "500" +printshellcmds: "True" + +# Example resource configuration +# default-resources: +# - runtime=100 +# - mem_mb=6000 +# - disk_mb=1000000 +# # set-threads: map rule names to threads +# set-threads: +# - single_core_rule=1 +# - multi_core_rule=10 +# # set-resources: map rule names to resources in general +# set-resources: +# - high_memory_rule:mem_mb=12000 +# - long_running_rule:runtime=1200 diff --git a/08-snakemake/profiles/slurm/settings.json b/08-snakemake/profiles/slurm/settings.json new file mode 100644 index 0000000..4402649 --- /dev/null +++ b/08-snakemake/profiles/slurm/settings.json @@ -0,0 +1,5 @@ +{ + "SBATCH_DEFAULTS": "", + "CLUSTER_NAME": "eri", + "CLUSTER_CONFIG": "" +} diff --git a/08-snakemake/profiles/slurm/slurm-jobscript.sh b/08-snakemake/profiles/slurm/slurm-jobscript.sh new file mode 100755 index 0000000..391741e --- /dev/null +++ b/08-snakemake/profiles/slurm/slurm-jobscript.sh @@ -0,0 +1,3 @@ +#!/bin/bash +# properties = {properties} +{exec_job} diff --git a/08-snakemake/profiles/slurm/slurm-sidecar.py b/08-snakemake/profiles/slurm/slurm-sidecar.py new file mode 100755 index 0000000..e79f5da --- /dev/null +++ b/08-snakemake/profiles/slurm/slurm-sidecar.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 +"""Run a Snakemake v7+ sidecar process for Slurm + +This sidecar process will poll ``squeue --user [user] --format='%i,%T'`` +every 60 seconds by default (use environment variable +``SNAKEMAKE_SLURM_SQUEUE_WAIT`` for adjusting this). + +Note that you have to adjust the value to fit to your ``MinJobAge`` Slurm +configuration. Jobs remain at least ``MinJobAge`` seconds known to the +Slurm controller (default of 300 seconds). If you query ``squeue`` every +60 seconds then this is plenty and you will observe all relevant job status +states as they are relevant for Snakemake. + +If the environment variable ``SNAKEMAKE_CLUSTER_SIDECAR_VARS`` is set then +the ``slurm-status.py`` of the slurm profile will attempt to query this +sidecar process via HTTP. As the sidecar process does not update its +cache in real-time, setting ``SNAKEMAKE_SLURM_SQUEUE_WAIT`` too large might +lead to Snakemake missing the "done" job state. The defaults of +``SNAKEMAKE_SLURM_SQUEUE_WAIT=60`` and Slurm's ``MinJobAge=600`` work well +together and you will see all relevant job statuses. + +If the sidecar is queried for a job ID that it has not seen yet then it will +perform a query to ``sacct`` such that it works well if Snakemake "resume +external job" feature. The ``slurm-submit.py`` script of the Snakemake profile +will register all jobs via POST with this sidecar. +""" + +import http.server +import json +import logging +import os +import subprocess +import sys +import signal +import time +import threading +import uuid + +from CookieCutter import CookieCutter + + +#: Enables debug messages for slurm sidecar. +DEBUG = bool(int(os.environ.get("SNAKEMAKE_SLURM_DEBUG", "0"))) +#: Enables HTTP request logging in sidecar. +LOG_REQUESTS = bool(int(os.environ.get("SNAKEMAKE_SLURM_LOG_REQUESTS", "0"))) +#: Command to call when calling squeue +SQUEUE_CMD = os.environ.get("SNAKEMAKE_SLURM_SQUEUE_CMD", "squeue") +#: Number of seconds to wait between ``squeue`` calls. +SQUEUE_WAIT = int(os.environ.get("SNAKEMAKE_SLURM_SQUEUE_WAIT", "60")) + +logger = logging.getLogger(__name__) +if DEBUG: + logging.basicConfig(level=logging.DEBUG) + logger.setLevel(logging.DEBUG) + + +class PollSqueueThread(threading.Thread): + """Thread that polls ``squeue`` until stopped by ``stop()``""" + + def __init__( + self, + squeue_wait, + squeue_cmd, + squeue_timeout=2, + sleep_time=0.01, + max_tries=3, + *args, + **kwargs + ): + super().__init__(target=self._work, *args, **kwargs) + #: Time to wait between squeue calls. + self.squeue_wait = squeue_wait + #: Command to call squeue with. + self.squeue_cmd = squeue_cmd + #: Whether or not the thread should stop. + self.stopped = threading.Event() + #: Previous call to ``squeue`` + self.prev_call = 0.0 + #: Time to sleep between iterations in seconds. Thread can only be + #: terminated after this interval when waiting. + self.sleep_time = sleep_time + #: Maximal running time to accept for call to ``squeue``. + self.squeue_timeout = squeue_timeout + #: Maximal number of tries if call to ``squeue`` fails. + self.max_tries = max_tries + #: Dict mapping the job id to the job state string. + self.states = {} + #: Make at least one call to squeue, must not fail. + logger.debug("initializing trhead") + self._call_squeue(allow_failure=False) + self.prev_call = time.time() + + def _work(self): + """Execute the thread's action""" + while not self.stopped.is_set(): + now = time.time() + if now - self.prev_call > self.squeue_wait: + self._call_squeue() + self.prev_call = now + time.sleep(self.sleep_time) + + def get_state(self, jobid): + """Return the job state for the given jobid.""" + jobid = str(jobid) + if jobid not in self.states: + try: + self.states[jobid] = self._get_state_sacct(jobid) + except: + return "__not_seen_yet__" + return self.states.get(jobid, "__not_seen_yet__") + + def register_job(self, jobid): + """Register job with the given ID.""" + self.states.setdefault(jobid, None) + + def _get_state_sacct(self, jobid): + """Implement retrieving state via sacct for resuming jobs.""" + cluster = CookieCutter.get_cluster_option() + cmd = ["sacct", "-P", "-b", "-j", jobid, "-n"] + if cluster: + cmd.append(cluster) + try_num = 0 + while try_num < self.max_tries: + try_num += 1 + try: + logger.debug("Calling %s (try %d)", cmd, try_num) + output = subprocess.check_output(cmd, timeout=self.squeue_timeout, text=True) + except subprocess.TimeoutExpired as e: + logger.warning("Call to %s timed out (try %d of %d)", cmd, try_num, self.max_tries) + continue + except subprocess.CalledProcessError as e: + logger.warning("Call to %s failed (try %d of %d)", cmd, try_num, self.max_tries) + continue + try: + parsed = {x.split("|")[0]: x.split("|")[1] for x in output.strip().split("\n")} + logger.debug("Returning state of %s as %s", jobid, parsed[jobid]) + return parsed[jobid] + except IndexError: + logger.warning("Could not parse %s (try %d of %d)", repr(output), try_num, self.max_tries) + secs = try_num / 2.0 + loger.info("Sleeping %f seconds", secs) + time.sleep(secs) + raise Exception("Problem with call to %s" % cmd) + + def stop(self): + """Flag thread to stop execution""" + logger.debug("stopping thread") + self.stopped.set() + + def _call_squeue(self, allow_failure=True): + """Run the call to ``squeue``""" + cluster = CookieCutter.get_cluster_option() + try_num = 0 + cmd = [SQUEUE_CMD, "--user={}".format(os.environ.get("USER")), "--format=%i,%T", "--state=all"] + if cluster: + cmd.append(cluster) + while try_num < self.max_tries: + try_num += 1 + try: + logger.debug("Calling %s (try %d)", cmd, try_num) + output = subprocess.check_output(cmd, timeout=self.squeue_timeout, text=True) + logger.debug("Output is:\n---\n%s\n---", output) + break + except subprocess.TimeoutExpired as e: + if not allow_failure: + raise + logger.debug("Call to %s timed out (try %d of %d)", cmd, try_num, self.max_tries) + except subprocess.CalledProcessError as e: + if not allow_failure: + raise + logger.debug("Call to %s failed (try %d of %d)", cmd, try_num, self.max_tries) + if try_num >= self.max_tries: + logger.debug("Giving up for this round") + else: + logger.debug("parsing output") + self._parse_output(output) + + def _parse_output(self, output): + """Parse output of ``squeue`` call.""" + header = None + for line in output.splitlines(): + line = line.strip() + arr = line.split(",") + if not header: + if not line.startswith("JOBID"): + continue # skip leader + header = arr + else: + logger.debug("Updating state of %s to %s", arr[0], arr[1]) + self.states[arr[0]] = arr[1] + + +class JobStateHttpHandler(http.server.BaseHTTPRequestHandler): + """HTTP handler class that responds to ```/job/status/${jobid}/`` GET requests""" + + def do_GET(self): + """Only to ``/job/status/${job_id}/?``""" + logger.debug("--- BEGIN GET") + # Remove trailing slashes from path. + path = self.path + while path.endswith("/"): + path = path[:-1] + # Ensure that /job/status was requested + if not self.path.startswith("/job/status/"): + self.send_response(400) + self.end_headers() + return + # Ensure authentication bearer is correct + auth_required = "Bearer %s" % self.server.http_secret + auth_header = self.headers.get("Authorization") + logger.debug( + "Authorization header is %s, required: %s" % (repr(auth_header), repr(auth_required)) + ) + if auth_header != auth_required: + self.send_response(403) + self.end_headers() + return + # Otherwise, query job ID status + job_id = self.path[len("/job/status/") :] + try: + job_id=job_id.split("%20")[3] + except IndexError: + pass + logger.debug("Querying for job ID %s" % repr(job_id)) + status = self.server.poll_thread.get_state(job_id) + logger.debug("Status: %s" % status) + if not status: + self.send_response(404) + self.end_headers() + else: + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + output = json.dumps({"status": status}) + logger.debug("Sending %s" % repr(output)) + self.wfile.write(output.encode("utf-8")) + logger.debug("--- END GET") + + def do_POST(self): + """Handle POSTs (only to ``/job/register/${job_id}/?``)""" + logger.debug("--- BEGIN POST") + # Remove trailing slashes from path. + path = self.path + while path.endswith("/"): + path = path[:-1] + # Ensure that /job/register was requested + if not self.path.startswith("/job/register/"): + self.send_response(400) + self.end_headers() + return + # Ensure authentication bearer is correct + auth_required = "Bearer %s" % self.server.http_secret + auth_header = self.headers.get("Authorization") + logger.debug( + "Authorization header is %s, required: %s", repr(auth_header), repr(auth_required) + ) + # Otherwise, register job ID + job_id = self.path[len("/job/status/") :] + self.server.poll_thread.register_job(job_id) + self.send_response(200) + self.end_headers() + logger.debug("--- END POST") + + def log_request(self, *args, **kwargs): + if LOG_REQUESTS: + super().log_request(*args, **kwargs) + + +class JobStateHttpServer(http.server.HTTPServer): + """The HTTP server class""" + + allow_reuse_address = False + + def __init__(self, poll_thread): + """Initialize thread and print the ``SNAKEMAKE_CLUSTER_SIDECAR_VARS`` to stdout, then flush.""" + super().__init__(("0.0.0.0", 0), JobStateHttpHandler) + #: The ``PollSqueueThread`` with the state dictionary. + self.poll_thread = poll_thread + #: The secret to use. + self.http_secret = str(uuid.uuid4()) + sidecar_vars = { + "server_port": self.server_port, + "server_secret": self.http_secret, + "pid": os.getpid(), + } + logger.debug(json.dumps(sidecar_vars)) + sys.stdout.write(json.dumps(sidecar_vars) + "\n") + sys.stdout.flush() + + def log_message(self, *args, **kwargs): + """Log messages are printed if ``DEBUG`` is ``True``.""" + if DEBUG: + super().log_message(*args, **kwargs) + + +def main(): + # Start thread to poll ``squeue`` in a controlled fashion. + poll_thread = PollSqueueThread(SQUEUE_WAIT, SQUEUE_CMD, name="poll-squeue") + poll_thread.start() + + # Initialize HTTP server that makes available the output of ``squeue --user [user]`` + # in a controlled fashion. + http_server = JobStateHttpServer(poll_thread) + http_thread = threading.Thread(name="http-server", target=http_server.serve_forever) + http_thread.start() + + # Allow for graceful shutdown of poll thread and HTTP server. + def signal_handler(signum, frame): + """Handler for Unix signals. Shuts down http_server and poll_thread.""" + logger.info("Shutting down squeue poll thread and HTTP server...") + # from remote_pdb import set_trace + # set_trace() + poll_thread.stop() + http_server.shutdown() + logger.info("... HTTP server and poll thread shutdown complete.") + for thread in threading.enumerate(): + logger.info("ACTIVE %s", thread.name) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Actually run the server. + poll_thread.join() + logger.debug("poll_thread done") + http_thread.join() + logger.debug("http_thread done") + + +if __name__ == "__main__": + sys.exit(int(main() or 0)) diff --git a/08-snakemake/profiles/slurm/slurm-status.py b/08-snakemake/profiles/slurm/slurm-status.py new file mode 100755 index 0000000..7cc28d1 --- /dev/null +++ b/08-snakemake/profiles/slurm/slurm-status.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +import json +import os +import re +import requests +import subprocess as sp +import shlex +import sys +import time +import logging +from CookieCutter import CookieCutter + +logger = logging.getLogger(__name__) + +STATUS_ATTEMPTS = 20 +SIDECAR_VARS = os.environ.get("SNAKEMAKE_CLUSTER_SIDECAR_VARS", None) +DEBUG = bool(int(os.environ.get("SNAKEMAKE_SLURM_DEBUG", "0"))) + +if DEBUG: + logging.basicConfig(level=logging.DEBUG) + logger.setLevel(logging.DEBUG) + + +def get_status_direct(jobid): + """Get status directly from sacct/scontrol""" + cluster = CookieCutter.get_cluster_option() + for i in range(STATUS_ATTEMPTS): + try: + sacct_res = sp.check_output(shlex.split(f"sacct {cluster} -P -b -j {jobid} -n")) + res = {x.split("|")[0]: x.split("|")[1] for x in sacct_res.decode().strip().split("\n")} + break + except sp.CalledProcessError as e: + logger.error("sacct process error") + logger.error(e) + except IndexError as e: + logger.error(e) + pass + # Try getting job with scontrol instead in case sacct is misconfigured + try: + sctrl_res = sp.check_output(shlex.split(f"scontrol {cluster} -o show job {jobid}")) + m = re.search(r"JobState=(\w+)", sctrl_res.decode()) + res = {jobid: m.group(1)} + break + except sp.CalledProcessError as e: + logger.error("scontrol process error") + logger.error(e) + if i >= STATUS_ATTEMPTS - 1: + print("failed") + exit(0) + else: + time.sleep(1) + + return res[jobid] or "" + + +def get_status_sidecar(jobid): + """Get status from cluster sidecar""" + sidecar_vars = json.loads(SIDECAR_VARS) + url = "http://localhost:%d/job/status/%s" % (sidecar_vars["server_port"], jobid) + headers = {"Authorization": "Bearer %s" % sidecar_vars["server_secret"]} + try: + resp = requests.get(url, headers=headers) + if resp.status_code == 404: + return "" # not found yet + logger.debug("sidecar returned: %s" % resp.json()) + resp.raise_for_status() + return resp.json().get("status") or "" + except requests.exceptions.ConnectionError as e: + logger.warning("slurm-status.py: could not query side car: %s", e) + logger.info("slurm-status.py: falling back to direct query") + return get_status_direct(jobid) + + +jobid = sys.argv[1] + +if SIDECAR_VARS: + logger.debug("slurm-status.py: querying sidecar") + status = get_status_sidecar(jobid) +else: + logger.debug("slurm-status.py: direct query") + status = get_status_direct(jobid) + +logger.debug("job status: %s", repr(status)) + +if status == "BOOT_FAIL": + print("failed") +elif status == "OUT_OF_MEMORY": + print("failed") +elif status.startswith("CANCELLED"): + print("failed") +elif status == "COMPLETED": + print("success") +elif status == "DEADLINE": + print("failed") +elif status == "FAILED": + print("failed") +elif status == "NODE_FAIL": + print("failed") +elif status == "PREEMPTED": + print("failed") +elif status == "TIMEOUT": + print("failed") +elif status == "SUSPENDED": + print("running") +else: + print("running") diff --git a/08-snakemake/profiles/slurm/slurm-submit.py b/08-snakemake/profiles/slurm/slurm-submit.py new file mode 100755 index 0000000..c5544b4 --- /dev/null +++ b/08-snakemake/profiles/slurm/slurm-submit.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +""" +Snakemake SLURM submit script. +""" +import json +import logging +import os + +import requests +from snakemake.utils import read_job_properties + +import slurm_utils +from CookieCutter import CookieCutter + +logger = logging.getLogger(__name__) + +SIDECAR_VARS = os.environ.get("SNAKEMAKE_CLUSTER_SIDECAR_VARS", None) +DEBUG = bool(int(os.environ.get("SNAKEMAKE_SLURM_DEBUG", "0"))) + +if DEBUG: + logging.basicConfig(level=logging.DEBUG) + logger.setLevel(logging.DEBUG) + + +def register_with_sidecar(jobid): + if SIDECAR_VARS is None: + return + sidecar_vars = json.loads(SIDECAR_VARS) + url = "http://localhost:%d/job/register/%s" % (sidecar_vars["server_port"], jobid) + logger.debug("POST to %s", url) + headers = {"Authorization": "Bearer %s" % sidecar_vars["server_secret"]} + requests.post(url, headers=headers) + + +# cookiecutter arguments +SBATCH_DEFAULTS = CookieCutter.SBATCH_DEFAULTS +CLUSTER = CookieCutter.get_cluster_option() +CLUSTER_CONFIG = CookieCutter.CLUSTER_CONFIG + +RESOURCE_MAPPING = { + "time": ("time", "runtime", "walltime"), + "mem": ("mem", "mem_mb", "ram", "memory"), + "mem-per-cpu": ("mem-per-cpu", "mem_per_cpu", "mem_per_thread"), + "nodes": ("nodes", "nnodes"), + "partition": ("partition", "queue"), +} + +# parse job +jobscript = slurm_utils.parse_jobscript() +job_properties = read_job_properties(jobscript) + +sbatch_options = {} +cluster_config = slurm_utils.load_cluster_config(CLUSTER_CONFIG) + +# 1) sbatch default arguments and cluster +sbatch_options.update(slurm_utils.parse_sbatch_defaults(SBATCH_DEFAULTS)) +sbatch_options.update(slurm_utils.parse_sbatch_defaults(CLUSTER)) + +# 2) cluster_config defaults +sbatch_options.update(cluster_config["__default__"]) + +# 3) Convert resources (no unit conversion!) and threads +sbatch_options.update(slurm_utils.convert_job_properties(job_properties, RESOURCE_MAPPING)) + +# 4) cluster_config for particular rule +sbatch_options.update(cluster_config.get(job_properties.get("rule"), {})) + +# 5) cluster_config options +sbatch_options.update(job_properties.get("cluster", {})) + +# convert human-friendly time - leaves slurm format time as is +if "time" in sbatch_options: + duration = str(sbatch_options["time"]) + sbatch_options["time"] = str(slurm_utils.Time(duration)) + +# 6) Format pattern in snakemake style +sbatch_options = slurm_utils.format_values(sbatch_options, job_properties) + +# 7) create output and error filenames and paths +joblog = slurm_utils.JobLog(job_properties) +log = "" +if "output" not in sbatch_options and CookieCutter.get_cluster_logpath(): + outlog = joblog.outlog + log = outlog + sbatch_options["output"] = outlog + +if "error" not in sbatch_options and CookieCutter.get_cluster_logpath(): + errlog = joblog.errlog + log = errlog + sbatch_options["error"] = errlog + +# ensure sbatch output dirs exist +for o in ("output", "error"): + slurm_utils.ensure_dirs_exist(sbatch_options[o]) if o in sbatch_options else None + +# 9) Set slurm job name +if "job-name" not in sbatch_options and "job_name" not in sbatch_options: + sbatch_options["job-name"] = joblog.jobname + +# submit job and echo id back to Snakemake (must be the only stdout) +jobid = slurm_utils.submit_job(jobscript, **sbatch_options) +logger.debug("Registering %s with sidecar...", jobid) +register_with_sidecar(jobid) +logger.debug("... done registering with sidecar") +print(jobid) diff --git a/08-snakemake/profiles/slurm/slurm_utils.py b/08-snakemake/profiles/slurm/slurm_utils.py new file mode 100644 index 0000000..c420154 --- /dev/null +++ b/08-snakemake/profiles/slurm/slurm_utils.py @@ -0,0 +1,403 @@ +#!/usr/bin/env python3 +import argparse +import math +import os +import re +import subprocess as sp +import sys +from datetime import timedelta +from os.path import dirname +from time import time as unix_time +from typing import Union +from uuid import uuid4 +import shlex +from io import StringIO + +from CookieCutter import CookieCutter +from snakemake import io +from snakemake.exceptions import WorkflowError +from snakemake.io import Wildcards +from snakemake.logging import logger +from snakemake.utils import AlwaysQuotedFormatter +from snakemake.utils import QuotedFormatter +from snakemake.utils import SequenceFormatter + + +def _convert_units_to_mb(memory): + """If memory is specified with SI unit, convert to MB""" + if isinstance(memory, int) or isinstance(memory, float): + return int(memory) + siunits = {"K": 1e-3, "M": 1, "G": 1e3, "T": 1e6} + regex = re.compile(r"(\d+)({})$".format("|".join(siunits.keys()))) + m = regex.match(memory) + if m is None: + logger.error( + (f"unsupported memory specification '{memory}';" " allowed suffixes: [K|M|G|T]") + ) + sys.exit(1) + factor = siunits[m.group(2)] + return int(int(m.group(1)) * factor) + + +def parse_jobscript(): + """Minimal CLI to require/only accept single positional argument.""" + p = argparse.ArgumentParser(description="SLURM snakemake submit script") + p.add_argument("jobscript", help="Snakemake jobscript with job properties.") + return p.parse_args().jobscript + + +def parse_sbatch_defaults(parsed): + """Unpack SBATCH_DEFAULTS.""" + d = shlex.split(parsed) if type(parsed) == str else parsed + args = {} + for keyval in [a.split("=") for a in d]: + k = keyval[0].strip().strip("-") + v = keyval[1].strip() if len(keyval) == 2 else None + args[k] = v + return args + + +def load_cluster_config(path): + """Load config to dict + + Load configuration to dict either from absolute path or relative + to profile dir. + """ + if path: + path = os.path.join(dirname(__file__), os.path.expandvars(path)) + dcc = io.load_configfile(path) + else: + dcc = {} + if "__default__" not in dcc: + dcc["__default__"] = {} + return dcc + + +# adapted from format function in snakemake.utils +def format(_pattern, _quote_all=False, **kwargs): # noqa: A001 + """Format a pattern in Snakemake style. + This means that keywords embedded in braces are replaced by any variable + values that are available in the current namespace. + """ + fmt = SequenceFormatter(separator=" ") + if _quote_all: + fmt.element_formatter = AlwaysQuotedFormatter() + else: + fmt.element_formatter = QuotedFormatter() + try: + return fmt.format(_pattern, **kwargs) + except KeyError as ex: + raise NameError( + f"The name {ex} is unknown in this context. Please " + "make sure that you defined that variable. " + "Also note that braces not used for variable access " + "have to be escaped by repeating them " + ) + + +# adapted from Job.format_wildcards in snakemake.jobs +def format_wildcards(string, job_properties): + """Format a string with variables from the job.""" + + class Job(object): + def __init__(self, job_properties): + for key in job_properties: + setattr(self, key, job_properties[key]) + + job = Job(job_properties) + if "params" in job_properties: + job._format_params = Wildcards(fromdict=job_properties["params"]) + else: + job._format_params = None + if "wildcards" in job_properties: + job._format_wildcards = Wildcards(fromdict=job_properties["wildcards"]) + else: + job._format_wildcards = None + _variables = dict() + _variables.update(dict(params=job._format_params, wildcards=job._format_wildcards)) + if hasattr(job, "rule"): + _variables.update(dict(rule=job.rule)) + try: + return format(string, **_variables) + except NameError as ex: + raise WorkflowError("NameError with group job {}: {}".format(job.jobid, str(ex))) + except IndexError as ex: + raise WorkflowError("IndexError with group job {}: {}".format(job.jobid, str(ex))) + + +# adapted from ClusterExecutor.cluster_params function in snakemake.executor +def format_values(dictionary, job_properties): + formatted = dictionary.copy() + for key, value in list(formatted.items()): + if key == "mem": + value = str(_convert_units_to_mb(value)) + if isinstance(value, str): + try: + formatted[key] = format_wildcards(value, job_properties) + except NameError as e: + msg = "Failed to format cluster config " "entry for job {}.".format( + job_properties["rule"] + ) + raise WorkflowError(msg, e) + return formatted + + +def convert_job_properties(job_properties, resource_mapping=None): + options = {} + if resource_mapping is None: + resource_mapping = {} + resources = job_properties.get("resources", {}) + for k, v in resource_mapping.items(): + options.update({k: resources[i] for i in v if i in resources}) + + if "threads" in job_properties: + options["cpus-per-task"] = job_properties["threads"] + + slurm_opts = resources.get("slurm", "") + if not isinstance(slurm_opts, str): + raise ValueError( + "The `slurm` argument to resources must be a space-separated string" + ) + + for opt in slurm_opts.split(): + kv = opt.split("=", maxsplit=1) + k = kv[0] + v = None if len(kv) == 1 else kv[1] + options[k.lstrip("-").replace("_", "-")] = v + + return options + + +def ensure_dirs_exist(path): + """Ensure output folder for Slurm log files exist.""" + di = dirname(path) + if di == "": + return + if not os.path.exists(di): + os.makedirs(di, exist_ok=True) + return + + +def format_sbatch_options(**sbatch_options): + """Format sbatch options""" + options = [] + for k, v in sbatch_options.items(): + val = "" + if v is not None: + val = f"={v}" + options.append(f"--{k}{val}") + return options + + +def submit_job(jobscript, **sbatch_options): + """Submit jobscript and return jobid.""" + options = format_sbatch_options(**sbatch_options) + try: + cmd = ["sbatch"] + ["--parsable"] + options + [jobscript] + res = sp.check_output(cmd) + except sp.CalledProcessError as e: + raise e + # Get jobid + res = res.decode() + try: + jobid = re.search(r"(\d+)", res).group(1) + except Exception as e: + raise e + return jobid + + +timeformats = [ + re.compile(r"^(?P\d+)-(?P\d+):(?P\d+):(?P\d+)$"), + re.compile(r"^(?P\d+)-(?P\d+):(?P\d+)$"), + re.compile(r"^(?P\d+)-(?P\d+)$"), + re.compile(r"^(?P\d+):(?P\d+):(?P\d+)$"), + re.compile(r"^(?P\d+):(?P\d+)$"), + re.compile(r"^(?P\d+)$"), +] + + +def time_to_minutes(time): + """Convert time string to minutes. + + According to slurm: + + Acceptable time formats include "minutes", "minutes:seconds", + "hours:minutes:seconds", "days-hours", "days-hours:minutes" + and "days-hours:minutes:seconds". + + """ + if not isinstance(time, str): + time = str(time) + d = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0} + regex = list(filter(lambda regex: regex.match(time) is not None, timeformats)) + if len(regex) == 0: + return + assert len(regex) == 1, "multiple time formats match" + m = regex[0].match(time) + d.update(m.groupdict()) + minutes = ( + int(d["days"]) * 24 * 60 + + int(d["hours"]) * 60 + + int(d["minutes"]) + + math.ceil(int(d["seconds"]) / 60) + ) + assert minutes > 0, "minutes has to be greater than 0" + return minutes + + +class InvalidTimeUnitError(Exception): + pass + + +class Time: + _nanosecond_size = 1 + _microsecond_size = 1000 * _nanosecond_size + _millisecond_size = 1000 * _microsecond_size + _second_size = 1000 * _millisecond_size + _minute_size = 60 * _second_size + _hour_size = 60 * _minute_size + _day_size = 24 * _hour_size + _week_size = 7 * _day_size + units = { + "s": _second_size, + "m": _minute_size, + "h": _hour_size, + "d": _day_size, + "w": _week_size, + } + pattern = re.compile(rf"(?P\d+(\.\d*)?|\.\d+)(?P[a-zA-Z])") + + def __init__(self, duration: str): + self.duration = Time._from_str(duration) + + def __str__(self) -> str: + return Time._timedelta_to_slurm(self.duration) + + def __repr__(self): + return str(self) + + @staticmethod + def _timedelta_to_slurm(delta: Union[timedelta, str]) -> str: + if isinstance(delta, timedelta): + d = dict() + d["hours"], rem = divmod(delta.seconds, 3600) + d["minutes"], d["seconds"] = divmod(rem, 60) + d["hours"] += delta.days * 24 + return "{hours}:{minutes:02d}:{seconds:02d}".format(**d) + elif isinstance(delta, str): + return delta + else: + raise ValueError("Time is in an unknown format '{}'".format(delta)) + + @staticmethod + def _from_str(duration: str) -> Union[timedelta, str]: + """Parse a duration string to a datetime.timedelta""" + + matches = Time.pattern.finditer(duration) + + total = 0 + n_matches = 0 + for m in matches: + n_matches += 1 + value = m.group("val") + unit = m.group("unit").lower() + if unit not in Time.units: + raise InvalidTimeUnitError( + "Unknown unit '{}' in time {}".format(unit, duration) + ) + + total += float(value) * Time.units[unit] + + if n_matches == 0: + return duration + + microseconds = total / Time._microsecond_size + return timedelta(microseconds=microseconds) + + +class JobLog: + def __init__(self, job_props: dict): + self.job_properties = job_props + self.uid = str(uuid4()) + + @property + def wildcards(self) -> dict: + return self.job_properties.get("wildcards", dict()) + + @property + def wildcards_str(self) -> str: + return ( + ".".join("{}={}".format(k, v) for k, v in self.wildcards.items()) + or "unique" + ) + + @property + def rule_name(self) -> str: + if not self.is_group_jobtype: + return self.job_properties.get("rule", "nameless_rule") + return self.groupid + + @property + def groupid(self) -> str: + return self.job_properties.get("groupid", "group") + + @property + def is_group_jobtype(self) -> bool: + return self.job_properties.get("type", "") == "group" + + @property + def short_uid(self) -> str: + return self.uid.split("-")[0] + + def pattern_replace(self, s: str) -> str: + """ + %r - rule name. If group job, will use the group ID instead + %i - snakemake job ID + %w - wildcards. e.g., wildcards A and B will be concatenated as 'A=.B=' + %U - a random universally unique identifier + %S - shortened version od %U + %T - Unix time, aka seconds since epoch (rounded to an integer) + """ + replacement = { + "%r": self.rule_name, + "%i": self.jobid, + "%w": self.wildcards_str, + "%U": self.uid, + "%T": str(int(unix_time())), + "%S": self.short_uid, + } + for old, new in replacement.items(): + s = s.replace(old, new) + + return s + + @property + def jobname(self) -> str: + jobname_pattern = CookieCutter.get_cluster_jobname() + if not jobname_pattern: + return "" + + return self.pattern_replace(jobname_pattern) + + @property + def jobid(self) -> str: + """The snakemake jobid""" + if self.is_group_jobtype: + return self.job_properties.get("jobid", "").split("-")[0] + return str(self.job_properties.get("jobid")) + + @property + def logpath(self) -> str: + logpath_pattern = CookieCutter.get_cluster_logpath() + if not logpath_pattern: + return "" + + return self.pattern_replace(logpath_pattern) + + @property + def outlog(self) -> str: + return self.logpath + ".out" + + @property + def errlog(self) -> str: + return self.logpath + ".err" diff --git a/08-snakemake/run_snakefile.sh b/08-snakemake/run_snakefile.sh index babe13a..efbba31 100755 --- a/08-snakemake/run_snakefile.sh +++ b/08-snakemake/run_snakefile.sh @@ -5,7 +5,7 @@ APSIM_JOBS=100 # Run Snakefile_txt echo "Processing text files..." -snakemake -s Snakefile_1 --profile nesi --jobs 1 +snakemake -s Snakefile_1 --profile slurm --jobs 1 # Check if the previous command was successful if [ $? -eq 0 ]; then @@ -13,7 +13,7 @@ if [ $? -eq 0 ]; then # Run Snakefile_apsimx echo "Processing APSIM files..." - snakemake -s Snakefile_2 --profile nesi --jobs $APSIM_JOBS + snakemake -s Snakefile_2 --profile slurm --jobs $APSIM_JOBS if [ $? -eq 0 ]; then echo "APSIM file processing completed successfully." diff --git a/submit.sh b/submit.sh index 1944cb9..e0e1105 100755 --- a/submit.sh +++ b/submit.sh @@ -28,6 +28,8 @@ echo "" echo -e "${YELLOW}Provide the path to working directory for apsim_simulations:${NC} \c" read -r working_dir +echo "" + # Check if the directory exists, if not create it if [ ! -d "$working_dir" ]; then mkdir -p "$working_dir" @@ -48,17 +50,26 @@ cp 08-snakemake/run_snakefile.sh "$working_dir" #Copy snakemake profile 08-snakemake/profiles to ~/.config/snakemake mkdir -p ~/.config/snakemake -cp -r 08-snakemake/profiles/nesi ~/.config/snakemake/ +cp -r 08-snakemake/profiles/slurm ~/.config/snakemake/ # Print completion message echo -e "\nSwitching to working directory now and running generate_apsim_configs.R to create config files." +echo "" # Change to the working directory cd "$working_dir" || exit # Print current directory to confirm echo -e "${GREEN}${BOLD}Current working directory: $(pwd)${NC}" +echo "" +#Load modules +echo -e "${YELLOW}Loading required modules and copying nesi Snakemake profile...${NC}" +if [[ $(hostname) == *eri* ]]; then + module purge && module load snakemake/7.32.3-foss-2023a-Python-3.11.6 R/4.4.1-foss-2023a Graphviz/12.1.2 +elif [[ $(hostname) == *mahuika* ]]; then + module purge >/dev/null 2>&1 && module load snakemake/7.32.3-gimkl-2022a-Python-3.11.3 R/4.3.1-gimkl-2022a +fi # Execute the R script echo -e "${YELLOW}Generating config files and splitting into multiple sets...${NC}" @@ -80,21 +91,13 @@ echo -e "${GREEN}${BOLD}Config files generation complete.${NC}" echo "" -#Load modules -echo -e "${YELLOW}Loading required modules and copying nesi Snakemake profile...${NC}" -if [[ $(hostname) == *eri* ]]; then - module purge && module load snakemake/7.32.3-foss-2023a-Python-3.11.6 R/4.4.1-foss-2023a Graphviz/12.1.2 -elif [[ $(hostname) == *mahuika* ]]; then - module purge >/dev/null 2>&1 && module load snakemake/7.32.3-gimkl-2022a-Python-3.11.3 R/4.3.1-gimkl-2022a -fi - - - # Ask if the user wants to submit the APSIM-HPC workflow echo -n -e "${YELLOW}Would you like to submit the APSIM-HPC workflow to generate .db files? (yes/no) : ${NC}" read -r submit_answer +echo "" + if [ "${submit_answer,,}" = "yes" ]; then # Verify the user is in the correct directory if [ "$(pwd)" != "$working_dir" ]; then