|
37 | 37 | import pytest |
38 | 38 | from lxml import etree |
39 | 39 |
|
40 | | -from osgeo import gdal |
| 40 | +from osgeo import gdal, ogr |
41 | 41 |
|
42 | 42 |
|
43 | 43 | ############################################################################### |
@@ -1460,3 +1460,232 @@ def test_vsifile_CopyFileRestartable(tmp_vsimem): |
1460 | 1460 | assert retcode == 0 |
1461 | 1461 | assert output_payload is None |
1462 | 1462 | assert gdal.VSIStatL(dstfilename).size == 3 |
| 1463 | + |
| 1464 | + |
| 1465 | +############################################################################### |
| 1466 | +# Test VSIFile helper class |
| 1467 | + |
| 1468 | + |
| 1469 | +def test_vsifile_class_write_ascii(tmp_path): |
| 1470 | + |
| 1471 | + fname = tmp_path / "test.txt" |
| 1472 | + |
| 1473 | + lines = ["permission is hereby granted", "free of charge", "to any person"] |
| 1474 | + |
| 1475 | + with gdaltest.vsi_open(fname, "w") as f: |
| 1476 | + assert f.tell() == 0 |
| 1477 | + |
| 1478 | + for line in lines: |
| 1479 | + f.write(line) |
| 1480 | + f.write("\n") |
| 1481 | + |
| 1482 | + with open(fname, "r") as f: |
| 1483 | + assert [line.strip() for line in f.readlines()] == lines |
| 1484 | + |
| 1485 | + |
| 1486 | +def test_vsifile_class_read_ascii(tmp_path): |
| 1487 | + |
| 1488 | + fname = str(tmp_path / "test.txt") |
| 1489 | + |
| 1490 | + lines = ["permission is hereby granted", "free of charge", "to any person"] |
| 1491 | + |
| 1492 | + with open(fname, "w", newline="\n") as f: |
| 1493 | + for line in lines: |
| 1494 | + f.write(line) |
| 1495 | + f.write("\n") |
| 1496 | + |
| 1497 | + with pytest.raises(Exception): |
| 1498 | + f.write(b"some bytes") |
| 1499 | + |
| 1500 | + # read entire file |
| 1501 | + with gdaltest.vsi_open(fname, "r") as f: |
| 1502 | + contents = f.read() |
| 1503 | + |
| 1504 | + assert type(contents) is str |
| 1505 | + |
| 1506 | + lines_in = [line.strip() for line in contents.strip().split("\n")] |
| 1507 | + assert lines_in == lines |
| 1508 | + |
| 1509 | + # read some characters |
| 1510 | + f = gdaltest.vsi_open(fname) |
| 1511 | + assert f.read(10) == "permission" |
| 1512 | + |
| 1513 | + # skip a character |
| 1514 | + f.seek(1, os.SEEK_CUR) |
| 1515 | + assert f.read(9) == "is hereby" |
| 1516 | + |
| 1517 | + # seek backwards |
| 1518 | + f.seek(-2, os.SEEK_CUR) |
| 1519 | + assert f.read(2) == "by" |
| 1520 | + |
| 1521 | + # jump to beginning |
| 1522 | + f.seek(0, os.SEEK_SET) |
| 1523 | + assert f.read(10) == "permission" |
| 1524 | + |
| 1525 | + # can't jump before the beginning |
| 1526 | + pos = f.tell() |
| 1527 | + with pytest.raises(OSError, match="negative offset"): |
| 1528 | + f.seek(-2, os.SEEK_SET) == -1 |
| 1529 | + assert pos == f.tell() |
| 1530 | + |
| 1531 | + # jump to end |
| 1532 | + f.seek(0, os.SEEK_END) |
| 1533 | + assert f.read(10) == "" |
| 1534 | + |
| 1535 | + f.seek(-7, os.SEEK_END) |
| 1536 | + assert f.read() == "person\n" |
| 1537 | + |
| 1538 | + f.close() |
| 1539 | + f.close() # no harm in closing an already-closed file |
| 1540 | + |
| 1541 | + |
| 1542 | +def test_vsifile_class_read_binary(tmp_path): |
| 1543 | + |
| 1544 | + fname = tmp_path / "test.wkb" |
| 1545 | + |
| 1546 | + g = ogr.CreateGeometryFromWkt("POINT (15 17)") |
| 1547 | + wkb = g.ExportToWkb() |
| 1548 | + |
| 1549 | + with open(fname, "wb") as f: |
| 1550 | + f.write(wkb) |
| 1551 | + |
| 1552 | + # read entire file |
| 1553 | + with gdaltest.vsi_open(fname, "rb") as f: |
| 1554 | + contents = f.read() |
| 1555 | + |
| 1556 | + assert type(contents) is bytes |
| 1557 | + |
| 1558 | + assert contents == wkb |
| 1559 | + |
| 1560 | + # read some bytes |
| 1561 | + f = gdaltest.vsi_open(fname, "rb") |
| 1562 | + assert f.read(5) == wkb[:5] |
| 1563 | + |
| 1564 | + f.seek(10, os.SEEK_SET) |
| 1565 | + assert f.read(5) == wkb[10:15] |
| 1566 | + |
| 1567 | + |
| 1568 | +def test_vsifile_class_write_binary(tmp_path): |
| 1569 | + |
| 1570 | + fname = tmp_path / "test.wkb" |
| 1571 | + |
| 1572 | + g = ogr.CreateGeometryFromWkt("POINT (15 17)") |
| 1573 | + wkb = g.ExportToWkb() |
| 1574 | + |
| 1575 | + with gdaltest.vsi_open(fname, "wb") as f: |
| 1576 | + f.write(wkb[:8]) |
| 1577 | + f.write(wkb[8:]) |
| 1578 | + |
| 1579 | + with open(fname, "rb") as f: |
| 1580 | + assert f.read() == wkb |
| 1581 | + |
| 1582 | + with gdaltest.vsi_open(fname, "rb") as f: |
| 1583 | + with pytest.raises(OSError, match="Expected to write"): |
| 1584 | + f.write(wkb) |
| 1585 | + |
| 1586 | + |
| 1587 | +def random_lines(): |
| 1588 | + import random |
| 1589 | + import string |
| 1590 | + |
| 1591 | + lines = [] |
| 1592 | + for i in range(50): |
| 1593 | + lines.append( |
| 1594 | + "".join([random.choice(string.ascii_letters) for j in range(20 + 3 * i)]) |
| 1595 | + ) |
| 1596 | + lines.append(" ") |
| 1597 | + lines.append("") |
| 1598 | + lines.append("theend") |
| 1599 | + lines.append("") |
| 1600 | + |
| 1601 | + return lines |
| 1602 | + |
| 1603 | + |
| 1604 | +@pytest.mark.parametrize("terminating_newline", (True, False)) |
| 1605 | +def test_vsifile_class_line_iteration(tmp_path, terminating_newline): |
| 1606 | + |
| 1607 | + fname = str(tmp_path / "test.txt") |
| 1608 | + |
| 1609 | + lines_out = random_lines() |
| 1610 | + |
| 1611 | + with open(fname, "w") as f: |
| 1612 | + for line in lines_out: |
| 1613 | + f.write(line) |
| 1614 | + f.write("\n") |
| 1615 | + |
| 1616 | + if not terminating_newline: |
| 1617 | + f.write("lastline") |
| 1618 | + lines_out.append("lastline") |
| 1619 | + |
| 1620 | + with gdaltest.vsi_open(fname) as f: |
| 1621 | + lines_in = [line for line in f] |
| 1622 | + |
| 1623 | + assert lines_in == lines_out |
| 1624 | + |
| 1625 | + |
| 1626 | +def test_vsifile_class_binary_line_iteration(tmp_path): |
| 1627 | + |
| 1628 | + fname = str(tmp_path / "test.txt") |
| 1629 | + |
| 1630 | + lines_out = [x.encode() for x in random_lines()] |
| 1631 | + |
| 1632 | + with open(fname, "wb") as f: |
| 1633 | + for line in lines_out: |
| 1634 | + f.write(line) |
| 1635 | + f.write(b"\n") |
| 1636 | + |
| 1637 | + with gdaltest.vsi_open(fname, "rb") as f: |
| 1638 | + lines_in = [line for line in f] |
| 1639 | + |
| 1640 | + assert lines_in == lines_out |
| 1641 | + |
| 1642 | + |
| 1643 | +def test_vsifile_class_zipped_csv_reader(tmp_path): |
| 1644 | + |
| 1645 | + test_csv = str(tmp_path / "input.csv") |
| 1646 | + test_zip = str(tmp_path / "input.zip") |
| 1647 | + |
| 1648 | + import csv |
| 1649 | + import shutil |
| 1650 | + import zipfile |
| 1651 | + |
| 1652 | + shutil.copy("../ogr/data/prime_meridian.csv", test_csv) |
| 1653 | + |
| 1654 | + with zipfile.ZipFile(test_zip, "w") as zf: |
| 1655 | + zf.write(test_csv, arcname="input.csv") |
| 1656 | + |
| 1657 | + with gdaltest.vsi_open(f"/vsizip/{test_zip}/input.csv") as f: |
| 1658 | + records = [x for x in csv.DictReader(f)] |
| 1659 | + |
| 1660 | + assert len(records) == 4 |
| 1661 | + assert ( |
| 1662 | + records[2]["INFORMATION_SOURCE"] |
| 1663 | + == "Institut Geographique National (IGN), Paris" |
| 1664 | + ) |
| 1665 | + |
| 1666 | + |
| 1667 | +def test_vsifile_class_file_does_not_exist(tmp_path): |
| 1668 | + |
| 1669 | + with pytest.raises(OSError, match="No such file or directory"): |
| 1670 | + gdaltest.vsi_open(tmp_path / "does_not_exist.txt") |
| 1671 | + |
| 1672 | + |
| 1673 | +def test_vsifile_class_read_from_closed_file(tmp_path): |
| 1674 | + |
| 1675 | + with gdaltest.vsi_open(tmp_path / "out.txt", "w") as f: |
| 1676 | + f.write("abc") |
| 1677 | + |
| 1678 | + with pytest.raises(ValueError, match="closed file"): |
| 1679 | + f.seek(0) |
| 1680 | + |
| 1681 | + |
| 1682 | +def test_vsifile_class_append(tmp_vsimem): |
| 1683 | + |
| 1684 | + fname = tmp_vsimem / "out.txt" |
| 1685 | + |
| 1686 | + with gdaltest.vsi_open(fname, "w") as f: |
| 1687 | + f.write("abc") |
| 1688 | + with gdaltest.vsi_open(fname, "a") as f: |
| 1689 | + f.write("def") |
| 1690 | + with gdaltest.vsi_open(fname) as f: |
| 1691 | + assert f.read() == "abcdef" |
0 commit comments