11import logging
2- import glob
32import os
43import os .path
54import shutil
98import uuid
109
1110from urlparse import urlparse
12- from service import ExternalService , SpawnedService
13- from testutil import get_open_port
11+ from test . service import ExternalService , SpawnedService
12+ from test . testutil import get_open_port
1413
1514class Fixture (object ):
1615 kafka_version = os .environ .get ('KAFKA_VERSION' , '0.8.0' )
@@ -36,23 +35,23 @@ def download_official_distribution(cls,
3635 output_file = os .path .join (output_dir , distfile + '.tgz' )
3736
3837 if os .path .isfile (output_file ):
39- logging .info ("Found file already on disk: %s" % output_file )
38+ logging .info ("Found file already on disk: %s" , output_file )
4039 return output_file
4140
4241 # New tarballs are .tgz, older ones are sometimes .tar.gz
4342 try :
4443 url = url_base + distfile + '.tgz'
45- logging .info ("Attempting to download %s" % ( url ,) )
44+ logging .info ("Attempting to download %s" , url )
4645 response = urllib2 .urlopen (url )
4746 except urllib2 .HTTPError :
4847 logging .exception ("HTTP Error" )
4948 url = url_base + distfile + '.tar.gz'
50- logging .info ("Attempting to download %s" % ( url ,) )
49+ logging .info ("Attempting to download %s" , url )
5150 response = urllib2 .urlopen (url )
5251
53- logging .info ("Saving distribution file to %s" % ( output_file ,) )
54- with open (os . path . join ( output_dir , distfile + '.tgz' ), ' w' ) as f :
55- f .write (response .read ())
52+ logging .info ("Saving distribution file to %s" , output_file )
53+ with open (output_file , ' w' ) as output_file_fd :
54+ output_file_fd .write (response .read ())
5655
5756 return output_file
5857
@@ -117,11 +116,9 @@ def open(self):
117116 self .render_template (template , properties , vars (self ))
118117
119118 # Configure Zookeeper child process
120- self .child = SpawnedService (args = self .kafka_run_class_args (
121- "org.apache.zookeeper.server.quorum.QuorumPeerMain" ,
122- properties ),
123- env = self .kafka_run_class_env ()
124- )
119+ args = self .kafka_run_class_args ("org.apache.zookeeper.server.quorum.QuorumPeerMain" , properties )
120+ env = self .kafka_run_class_env ()
121+ self .child = SpawnedService (args , env )
125122
126123 # Party!
127124 self .out ("Starting..." )
@@ -162,7 +159,7 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=
162159 self .zk_port = zk_port
163160 self .zk_chroot = zk_chroot
164161
165- self .replicas = replicas
162+ self .replicas = replicas
166163 self .partitions = partitions
167164
168165 self .tmp_dir = None
@@ -199,21 +196,19 @@ def open(self):
199196 self .render_template (template , properties , vars (self ))
200197
201198 # Configure Kafka child process
202- self .child = SpawnedService (args = self .kafka_run_class_args (
203- "kafka.Kafka" , properties ),
204- env = self .kafka_run_class_env ()
205- )
199+ args = self .kafka_run_class_args ("kafka.Kafka" , properties )
200+ env = self .kafka_run_class_env ()
201+ self .child = SpawnedService (args , env )
206202
207203 # Party!
208204 self .out ("Creating Zookeeper chroot node..." )
209- proc = subprocess .Popen (self .kafka_run_class_args (
210- "org.apache.zookeeper.ZooKeeperMain" ,
211- "-server" , "%s:%d" % (self .zk_host , self .zk_port ),
212- "create" , "/%s" % self .zk_chroot , "kafka-python"
213- ),
214- env = self .kafka_run_class_env (),
215- stdout = subprocess .PIPE ,
216- stderr = subprocess .PIPE )
205+ args = self .kafka_run_class_args ("org.apache.zookeeper.ZooKeeperMain" ,
206+ "-server" , "%s:%d" % (self .zk_host , self .zk_port ),
207+ "create" ,
208+ "/%s" % self .zk_chroot ,
209+ "kafka-python" )
210+ env = self .kafka_run_class_env ()
211+ proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
217212
218213 if proc .wait () != 0 :
219214 self .out ("Failed to create Zookeeper chroot node" )
0 commit comments