-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtf_worker
executable file
·129 lines (110 loc) · 2.59 KB
/
tf_worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/bin/sh
#
# This is the standard run script that is executed on the compute node.
# It fetches sequences from a server using netcat, runs some analysis, and pushes back results
#
#
SERVER=$1
PORT=$2
if [ ! -z $SKEW ] ; then
let skew=RANDOM%SKEW
sleep $skew
fi
# TODO: Potential conflict where two workers could use the same directory in a
# shared file system.
[ -z $TF_TMPBASE ] && TF_TMPBASE=/tmp
export TF_TMPBASE
export TF_TMPDIR=$TF_TMPBASE/$USER-$$
# Source any setup files that are needed for the system.
#
TF_CONFDIR=$TF_HOME/share/taskfarmer/
if [ -x $TF_CONFDIR/$NERSC_HOST.stage ] ; then
. $TF_CONFDIR/$NERSC_HOST.stage
fi
if [ ! -z $TF_SERVERS ] ; then
if [ $(echo $TF_SERVERS | grep -c '^/' ) -eq 0 ] ; then
TF_SERVERS=`pwd`/servers
fi
fi
mkdir -p $TF_TMPDIR
cd $TF_TMPDIR
# Source the staging file specified by the user if there is one.
#
if [ ! -z $STAGE ] && [ ! -x "$STAGE" ] ; then
echo "Error sourcing $STAGE"
exit
fi
if [ ! -z $STAGE ] && [ -x "$STAGE" ] ; then
. $STAGE
fi
# Change back into the temp dir just in case the
# stager moved us around.
#
cd $TF_TMPDIR
# This will be called if an interupt is received.
#
cleanup() {
# echo "Called cleanup"
for pid in $PIDS ; do
if [ -d /proc/$pid ] ;then
kill $pid
fi
done
sleep 1
cd $TF_TMPBASE
rm -rf $TF_TMPDIR
CLEANUP=1
if [ ! -z $STAGE ] && [ -x "$STAGE" ] ; then
. $STAGE
fi
exit
}
# Set the trap
trap cleanup 2 15
launch_threads() {
for THREAD in $(seq $THREADS) ; do
export ID="$BID-$THREAD"
perl $TF_HOME/libexec/taskfarmer/tf_worker_thread $SERVER $PORT $THREAD &
PIDS="$PIDS $!"
done
# Wait for all threads to exit before cleanup
wait
}
# Check if we can atleast write 256MB of output
#
check_node() {
dd if=/dev/zero of=$TF_TMPDIR/testfile bs=1k count=256k > /dev/null 2>&1
succ=$?
rm $TF_TMPDIR/testfile
if [ $succ -ne 0 ] ;then
echo "Black listing $BID. $TF_TMPBASE full." >&2
exit
fi
}
# Determine the number of threads
#
[ -z $THREADS ] && THREADS=$(grep -c processor /proc/cpuinfo)
#
# Run the IDCOMMAND
eval $IDCOMMAND
export BID=$ID
[ -z "$SKIP_CHECK" ] && check_node
if [ ! -z $BLACKLIST ] ; then
if [ $(grep -c "^$BID$" $BLACKLIST ) -gt 0 ] ; then
echo "$BID found in blacklist. Exiting." >&2
exit
fi
fi
if [ ! -z $TF_SERVERS ] ; then
for l in $(cat $TF_SERVERS|awk -F: '{print $1":"$2}') ; do
SERVER=$(echo $l|awk -F: '{print $1}')
PORT=$(echo $l|awk -F: '{print $2}')
echo "Starting with server: $SERVER and port: $PORT"
launch_threads
done
else
launch_threads
fi
# Cleanup Cleanup everybody do their share
#
cleanup