From c3ce23c98f94840057b08b3a9ea23d4382ff44d1 Mon Sep 17 00:00:00 2001 From: Brendan <2bndy5@gmail.com> Date: Tue, 25 Jun 2024 16:34:09 -0700 Subject: [PATCH] improve scanner promiscuity similar to changes in nRF24/RF24#955 --- examples/nrf24l01_scanner_test.py | 52 +++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/examples/nrf24l01_scanner_test.py b/examples/nrf24l01_scanner_test.py index d1fad0d..07bc06f 100644 --- a/examples/nrf24l01_scanner_test.py +++ b/examples/nrf24l01_scanner_test.py @@ -40,16 +40,20 @@ # 21 = bus 2, CE1 # enable SPI bus 1 prior to running this # turn off RX features specific to the nRF24L01 module -nrf.auto_ack = False nrf.dynamic_payloads = False +nrf.auto_ack = False nrf.crc = 0 nrf.arc = 0 nrf.allow_ask_no_ack = False # use reverse engineering tactics for a better "snapshot" nrf.address_length = 2 -nrf.open_rx_pipe(1, b"\0\x55") -nrf.open_rx_pipe(0, b"\0\xaa") +nrf.open_rx_pipe(0, b"\xaa\xaa") +nrf.open_rx_pipe(1, b"\x55\x55") +nrf.open_rx_pipe(2, b"\0\xaa") +nrf.open_rx_pipe(3, b"\0\x55") +nrf.open_rx_pipe(4, b"\xa0\xaa") +nrf.open_rx_pipe(5, b"\x50\x55") def scan(timeout=30): @@ -67,32 +71,49 @@ def scan(timeout=30): print(str(i % 10), sep="", end="") print("\n" + "~" * 126) + sweeps = 0 signals = [0] * 126 # store the signal count for each channel curr_channel = 0 start_timer = time.monotonic() # start the timer while time.monotonic() - start_timer < timeout: nrf.channel = curr_channel - if nrf.available(): - nrf.flush_rx() # flush the RX FIFO because it asserts the RPD flag - nrf.listen = 1 # start a RX session + # nrf.flush_rx() + nrf.listen = True # start a RX session time.sleep(0.00013) # wait 130 microseconds - signals[curr_channel] += nrf.rpd # if interference is present - nrf.listen = 0 # end the RX session - curr_channel = curr_channel + 1 if curr_channel < 125 else 0 + found_signal = nrf.rpd + nrf.listen = False # end the RX session + found_signal = found_signal or nrf.rpd or nrf.available() + + # count signal as interference + signals[curr_channel] += found_signal + # clear the RX FIFO if a signal was detected/captured + if found_signal: + nrf.flush_rx() # flush the RX FIFO because it asserts the RPD flag + endl = False + if curr_channel > 124: + sweeps += 1 + if sweeps >= 15: + endl = True + sweeps = 0 # output the signal counts per channel sig_cnt = signals[curr_channel] print( - ("%X" % min(15, sig_cnt)) if sig_cnt else "-", + ("%X" % sig_cnt) if sig_cnt else "-", sep="", - end="" if curr_channel < 125 else "\r", + end="" if curr_channel < 125 else ("\n" if endl else "\r"), ) + curr_channel = curr_channel + 1 if curr_channel < 125 else 0 + if endl: + signals = [0] * 126 + # finish printing results and end with a new line while curr_channel < len(signals) - 1: curr_channel += 1 sig_cnt = signals[curr_channel] - print(("%X" % min(15, sig_cnt)) if sig_cnt else "-", sep="", end="") + print(("%X" % sig_cnt) if sig_cnt else "-", sep="", end="") print("") + nrf.flush_rx() # flush the RX FIFO for continued operation def noise(timeout=1, channel=None): @@ -107,13 +128,12 @@ def noise(timeout=1, channel=None): nrf.listen = True timeout += time.monotonic() while time.monotonic() < timeout: - signal = nrf.read() - if signal: - print(address_repr(signal, False, " ")) + if nrf.available(): + print(address_repr(nrf.read(32), False, " ")) nrf.listen = False while not nrf.fifo(False, True): # dump the left overs in the RX FIFO - print(address_repr(nrf.read(), False, " ")) + print(address_repr(nrf.read(32), False, " ")) def set_role():