|
37 | 37 | import pytest |
38 | 38 | from lxml import etree |
39 | 39 |
|
40 | | -from osgeo import gdal |
| 40 | +from osgeo import gdal, ogr |
41 | 41 |
|
42 | 42 |
|
43 | 43 | ############################################################################### |
@@ -1341,3 +1341,217 @@ def test_vsifile_use_closed_file(tmp_path): |
1341 | 1341 |
|
1342 | 1342 | with pytest.raises(ValueError, match="closed file"): |
1343 | 1343 | gdal.VSIFWriteL("0123456789", 1, 10, f) |
| 1344 | + |
| 1345 | + |
| 1346 | +############################################################################### |
| 1347 | +# Test VSIFile helper class |
| 1348 | + |
| 1349 | + |
| 1350 | +def test_vsifile_class_write_ascii(tmp_path): |
| 1351 | + |
| 1352 | + fname = tmp_path / "test.txt" |
| 1353 | + |
| 1354 | + lines = ["permission is hereby granted", "free of charge", "to any person"] |
| 1355 | + |
| 1356 | + with gdaltest.vsi_open(fname, "w") as f: |
| 1357 | + assert f.tell() == 0 |
| 1358 | + |
| 1359 | + for line in lines: |
| 1360 | + f.write(line) |
| 1361 | + f.write("\n") |
| 1362 | + |
| 1363 | + with open(fname, "r") as f: |
| 1364 | + assert [line.strip() for line in f.readlines()] == lines |
| 1365 | + |
| 1366 | + |
| 1367 | +def test_vsifile_class_read_ascii(tmp_path): |
| 1368 | + |
| 1369 | + fname = str(tmp_path / "test.txt") |
| 1370 | + |
| 1371 | + lines = ["permission is hereby granted", "free of charge", "to any person"] |
| 1372 | + |
| 1373 | + with open(fname, "w", newline="\n") as f: |
| 1374 | + for line in lines: |
| 1375 | + f.write(line) |
| 1376 | + f.write("\n") |
| 1377 | + |
| 1378 | + with pytest.raises(Exception): |
| 1379 | + f.write(b"some bytes") |
| 1380 | + |
| 1381 | + # read entire file |
| 1382 | + with gdaltest.vsi_open(fname, "r") as f: |
| 1383 | + contents = f.read() |
| 1384 | + |
| 1385 | + assert type(contents) is str |
| 1386 | + |
| 1387 | + lines_in = [line.strip() for line in contents.strip().split("\n")] |
| 1388 | + assert lines_in == lines |
| 1389 | + |
| 1390 | + # read some characters |
| 1391 | + f = gdaltest.vsi_open(fname) |
| 1392 | + assert f.read(10) == "permission" |
| 1393 | + |
| 1394 | + # skip a character |
| 1395 | + f.seek(1, os.SEEK_CUR) |
| 1396 | + assert f.read(9) == "is hereby" |
| 1397 | + |
| 1398 | + f.seek(0, os.SEEK_SET) |
| 1399 | + assert f.read(10) == "permission" |
| 1400 | + |
| 1401 | + # jump to end |
| 1402 | + f.seek(0, os.SEEK_END) |
| 1403 | + assert f.read(10) == "" |
| 1404 | + |
| 1405 | + f.seek(-7, os.SEEK_END) |
| 1406 | + assert f.read() == "person\n" |
| 1407 | + |
| 1408 | + f.close() |
| 1409 | + f.close() # no harm in closing an already-closed file |
| 1410 | + |
| 1411 | + |
| 1412 | +def test_vsifile_class_read_binary(tmp_path): |
| 1413 | + |
| 1414 | + fname = tmp_path / "test.wkb" |
| 1415 | + |
| 1416 | + g = ogr.CreateGeometryFromWkt("POINT (15 17)") |
| 1417 | + wkb = g.ExportToWkb() |
| 1418 | + |
| 1419 | + with open(fname, "wb") as f: |
| 1420 | + f.write(wkb) |
| 1421 | + |
| 1422 | + # read entire file |
| 1423 | + with gdaltest.vsi_open(fname, "rb") as f: |
| 1424 | + contents = f.read() |
| 1425 | + |
| 1426 | + assert type(contents) is bytes |
| 1427 | + |
| 1428 | + assert contents == wkb |
| 1429 | + |
| 1430 | + # read some bytes |
| 1431 | + f = gdaltest.vsi_open(fname, "rb") |
| 1432 | + assert f.read(5) == wkb[:5] |
| 1433 | + |
| 1434 | + f.seek(10, os.SEEK_SET) |
| 1435 | + assert f.read(5) == wkb[10:15] |
| 1436 | + |
| 1437 | + |
| 1438 | +def test_vsifile_class_write_binary(tmp_path): |
| 1439 | + |
| 1440 | + fname = tmp_path / "test.wkb" |
| 1441 | + |
| 1442 | + g = ogr.CreateGeometryFromWkt("POINT (15 17)") |
| 1443 | + wkb = g.ExportToWkb() |
| 1444 | + |
| 1445 | + with gdaltest.vsi_open(fname, "wb") as f: |
| 1446 | + f.write(wkb[:8]) |
| 1447 | + f.write(wkb[8:]) |
| 1448 | + |
| 1449 | + with open(fname, "rb") as f: |
| 1450 | + assert f.read() == wkb |
| 1451 | + |
| 1452 | + |
| 1453 | +def random_lines(): |
| 1454 | + import random |
| 1455 | + import string |
| 1456 | + |
| 1457 | + lines = [] |
| 1458 | + for i in range(50): |
| 1459 | + lines.append( |
| 1460 | + "".join([random.choice(string.ascii_letters) for j in range(20 + 3 * i)]) |
| 1461 | + ) |
| 1462 | + lines.append(" ") |
| 1463 | + lines.append("") |
| 1464 | + lines.append("theend") |
| 1465 | + lines.append("") |
| 1466 | + |
| 1467 | + return lines |
| 1468 | + |
| 1469 | + |
| 1470 | +@pytest.mark.parametrize("terminating_newline", (True, False)) |
| 1471 | +def test_vsifile_class_line_iteration(tmp_path, terminating_newline): |
| 1472 | + |
| 1473 | + fname = str(tmp_path / "test.txt") |
| 1474 | + |
| 1475 | + lines_out = random_lines() |
| 1476 | + |
| 1477 | + with open(fname, "w") as f: |
| 1478 | + for line in lines_out: |
| 1479 | + f.write(line) |
| 1480 | + f.write("\n") |
| 1481 | + |
| 1482 | + if not terminating_newline: |
| 1483 | + f.write("lastline") |
| 1484 | + lines_out.append("lastline") |
| 1485 | + |
| 1486 | + with gdaltest.vsi_open(fname) as f: |
| 1487 | + lines_in = [line for line in f] |
| 1488 | + |
| 1489 | + assert lines_in == lines_out |
| 1490 | + |
| 1491 | + |
| 1492 | +def test_vsifile_class_binary_line_iteration(tmp_path): |
| 1493 | + |
| 1494 | + fname = str(tmp_path / "test.txt") |
| 1495 | + |
| 1496 | + lines_out = [x.encode() for x in random_lines()] |
| 1497 | + |
| 1498 | + with open(fname, "wb") as f: |
| 1499 | + for line in lines_out: |
| 1500 | + f.write(line) |
| 1501 | + f.write(b"\n") |
| 1502 | + |
| 1503 | + with gdaltest.vsi_open(fname, "rb") as f: |
| 1504 | + lines_in = [line for line in f] |
| 1505 | + |
| 1506 | + assert lines_in == lines_out |
| 1507 | + |
| 1508 | + |
| 1509 | +def test_vsifile_class_zipped_csv_reader(tmp_path): |
| 1510 | + |
| 1511 | + test_csv = str(tmp_path / "input.csv") |
| 1512 | + test_zip = str(tmp_path / "input.zip") |
| 1513 | + |
| 1514 | + import csv |
| 1515 | + import shutil |
| 1516 | + import zipfile |
| 1517 | + |
| 1518 | + shutil.copy("../ogr/data/prime_meridian.csv", test_csv) |
| 1519 | + |
| 1520 | + with zipfile.ZipFile(test_zip, "w") as zf: |
| 1521 | + zf.write(test_csv, arcname="input.csv") |
| 1522 | + |
| 1523 | + with gdaltest.vsi_open(f"/vsizip/{test_zip}/input.csv") as f: |
| 1524 | + records = [x for x in csv.DictReader(f)] |
| 1525 | + |
| 1526 | + assert len(records) == 4 |
| 1527 | + assert ( |
| 1528 | + records[2]["INFORMATION_SOURCE"] |
| 1529 | + == "Institut Geographique National (IGN), Paris" |
| 1530 | + ) |
| 1531 | + |
| 1532 | + |
| 1533 | +def test_vsifile_class_file_does_not_exist(tmp_path): |
| 1534 | + |
| 1535 | + with pytest.raises(OSError, match="No such file or directory"): |
| 1536 | + gdaltest.vsi_open(tmp_path / "does_not_exist.txt") |
| 1537 | + |
| 1538 | + |
| 1539 | +def test_vsifile_class_read_from_closed_file(tmp_path): |
| 1540 | + |
| 1541 | + with gdaltest.vsi_open(tmp_path / "out.txt", "w") as f: |
| 1542 | + f.write("abc") |
| 1543 | + |
| 1544 | + with pytest.raises(ValueError, match="closed file"): |
| 1545 | + f.seek(0) |
| 1546 | + |
| 1547 | + |
| 1548 | +def test_vsifile_class_append(tmp_vsimem): |
| 1549 | + |
| 1550 | + fname = tmp_vsimem / "out.txt" |
| 1551 | + |
| 1552 | + with gdaltest.vsi_open(fname, "w") as f: |
| 1553 | + f.write("abc") |
| 1554 | + with gdaltest.vsi_open(fname, "a") as f: |
| 1555 | + f.write("def") |
| 1556 | + with gdaltest.vsi_open(fname) as f: |
| 1557 | + assert f.read() == "abcdef" |
0 commit comments