Skip to content

Commit

Permalink
added time encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
07pepa authored and pepe-rtmlab committed May 2, 2024
1 parent fc79936 commit 8087d7c
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/github-actions-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
matrix:
python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11" ]
mongodb-version: [ 4.4, 5.0 ]
pydantic-version: [ 1.10.12, 2.3 ]
pydantic-version: [ 1.10.12, 2.5.3 ]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down
61 changes: 61 additions & 0 deletions beanie/odm/utils/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,65 @@
from beanie.odm.fields import Link, LinkTypes
from beanie.odm.utils.pydantic import IS_PYDANTIC_V2, get_model_fields

UNIX_TIME_START = datetime.datetime(1970, 1, 1, 0, 0, 0, 0)


def _compatct_time_encoder(time: datetime.time) -> str:
# re-implementation of speed date in python
# https://github.com/pydantic/speedate/blob/aad1d9117a05618ae883e20ae179c582eb998eeb /src/time.rs#L41 and
# pydantic https://github.com/pydantic/pydantic-core/blob/f636403c2e8169bcbb94aa71a0727d76076f7e6e/src/input
# /datetime.rs#L177
if time.microsecond != 0:
time_string = "{:02d}:{:02d}:{:02d}.{:06d}".format(
time.hour, time.minute, time.second, time.microsecond
)
elif time.second != 0:
time_string = "{:02d}:{:02d}:{:02d}".format(
time.hour, time.minute, time.second
)
else:
time_string = "{:02d}:{:02d}".format(time.hour, time.minute)

tz_offset = None
if time.tzinfo is not None: # has timezone
# check if the offset is fixed no matter the datetime
offset_delta = time.tzinfo.utcoffset(None)
if (
offset_delta is None
): # is ambiguous resolve base on 1.1.1970 to be consistent across time
try:
offset_delta = time.tzinfo.utcoffset(UNIX_TIME_START)
except Exception as e:
raise ValueError from e
if offset_delta is not None: # is not ambiguous
tz_offset = round(offset_delta.total_seconds())
else: # even if stuffing time into may work is wrong see
raise ValueError(
"timezone is not fixed and cannot be resolved with datetime,"
f"contact the maintainer of {type(time.tzinfo).__name__} for support"
)
else: # is known without datetime
tz_offset = round(offset_delta.total_seconds())

if tz_offset is not None:
if tz_offset == 0:
time_string += "Z"
else:
is_negative = tz_offset < 0
hours = abs(tz_offset // 3600)
minutes = abs(tz_offset % 3600) // 60

# since minutes are negative we need to subtract them from 60 and roll one hour back to be correct
if is_negative and minutes != 0:
hours -= 1
minutes = 60 - minutes

sign = "-" if is_negative else "+"
time_string += "{}{:02d}:{:02d}".format(sign, hours, minutes)

return time_string


SingleArgCallable = Callable[[Any], Any]
DEFAULT_CUSTOM_ENCODERS: MutableMapping[type, SingleArgCallable] = {
ipaddress.IPv4Address: str,
Expand All @@ -37,8 +96,10 @@
pathlib.PurePath: str,
pydantic.SecretBytes: pydantic.SecretBytes.get_secret_value,
pydantic.SecretStr: pydantic.SecretStr.get_secret_value,
# datetimes
datetime.date: lambda d: datetime.datetime.combine(d, datetime.time.min),
datetime.timedelta: operator.methodcaller("total_seconds"),
datetime.time: _compatct_time_encoder,
enum.Enum: operator.attrgetter("value"),
Link: operator.attrgetter("ref"),
bytes: bson.Binary,
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ test = [
"pydantic-settings>=2",
"pydantic-extra-types>=2",
"pydantic[email]",
"pytz",
"tzdata"
]
doc = [
"Pygments>=2.8.0",
Expand Down
2 changes: 2 additions & 0 deletions tests/odm/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DocNonRoot,
DocumentForEncodingTest,
DocumentForEncodingTestDate,
DocumentForEncodingTestTime,
DocumentMultiModelOne,
DocumentMultiModelTwo,
DocumentTestModel,
Expand Down Expand Up @@ -289,6 +290,7 @@ async def init(db):
DocumentWithLinkForNesting,
DocumentWithBackLinkForNesting,
LongSelfLink,
DocumentForEncodingTestTime,
]
await init_beanie(
database=db,
Expand Down
4 changes: 4 additions & 0 deletions tests/odm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,3 +1101,7 @@ class LongSelfLink(Document):

class Settings:
max_nesting_depth = 50


class DocumentForEncodingTestTime(Document):
time_field: datetime.time
113 changes: 112 additions & 1 deletion tests/odm/test_encoder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import re
from datetime import date, datetime
from datetime import date, datetime, time
from datetime import timezone as dt_timezone
from typing import Union
from uuid import uuid4

import pytest
from bson import Binary, Regex
from pydantic import AnyUrl

# support for older python versions (will cause to test some things twice)
from pytz import UTC, all_timezones, timezone

from beanie.odm.utils.encoder import Encoder
from beanie.odm.utils.pydantic import IS_PYDANTIC_V2
from tests.odm.models import (
Child,
DocumentForEncodingTest,
DocumentForEncodingTestDate,
DocumentForEncodingTestTime,
DocumentWithComplexDictKey,
DocumentWithDecimalField,
DocumentWithHttpUrlField,
Expand All @@ -21,6 +27,13 @@
SampleWithMutableObjects,
)

has_zone_info = True
try:
from zoneinfo import ZoneInfo, available_timezones
except ImportError:
has_zone_info = False
ZoneInfo = timezone


async def test_encode_datetime():
assert isinstance(Encoder().encode(datetime.now()), datetime)
Expand Down Expand Up @@ -169,3 +182,101 @@ async def test_dict_with_complex_key():

assert isinstance(new_doc.dict_field, dict)
assert new_doc.dict_field.get(uuid) == dt


def is_same_type_or_subtype(obj1, obj2):
return isinstance(obj1, type(obj2)) or isinstance(obj2, type(obj1))


def assert_time_equal(to_test: time, reference: time):
if to_test.tzinfo is not None: # tz
if reference.tzinfo.utcoffset(None) is None:
now = datetime(
1970, 1, 1, 0, 0, 0, 0
) # date dependsent because of daylight saving time

check_tz_info(to_test, reference, now)
# compare without info
assert to_test.replace(tzinfo=None) == reference.replace(
tzinfo=None
)
else:
check_tz_info(to_test, reference)
assert to_test.replace(tzinfo=None) == reference.replace(
tzinfo=None
)
else:
assert to_test == reference


def check_tz_info(
to_test: time, reference: time, when: Union[datetime, None] = None
):
# up to 1-minute (not included) difference is allowed by serialization standard used by pydantic
assert (
to_test.tzinfo.utcoffset(when).total_seconds() // 60
== reference.tzinfo.utcoffset(when).total_seconds() // 60
)


async def inner_test_time(test_time: time):
doc = DocumentForEncodingTestTime(time_field=test_time)
await doc.insert()
new_doc = await DocumentForEncodingTestTime.get(doc.id)
assert_time_equal(new_doc.time_field, doc.time_field)
assert isinstance(new_doc.time_field, time)
assert_time_equal(new_doc.time_field, test_time)


@pytest.mark.parametrize(
"test_time",
[
time(12),
time(12, fold=1),
time(12, 3),
time(12, 3, fold=1),
time(12, 4, 5),
time(12, 4, 5, fold=1),
time(12, 4, 5, 123456),
time(12, 4, 5, 123456, fold=1),
time(12, 4, 5, 123456, tzinfo=UTC),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=UTC, fold=1),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague"), fold=1),
time(12, 4, 5, 123456, tzinfo=dt_timezone.utc),
time(12, 4, 5, 123456, tzinfo=dt_timezone.utc, fold=1),
time(12, 4, 5, 123456, tzinfo=ZoneInfo("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=ZoneInfo("Europe/Prague"), fold=1),
],
)
async def test_encode_time_with_tz(test_time: time):
await inner_test_time(test_time)


if has_zone_info:
tz = list(available_timezones())
tz.sort()

@pytest.mark.parametrize("tz_string", tz)
async def test_encode_time_exhaustive_timezones_zone_info(tz_string: str):
await inner_test_time(
time(12, 4, 5, 123456, tzinfo=ZoneInfo(tz_string))
)


# folowing causes pytz.exceptions.NonExistentTimeError
pytz_unsupported = (
"America/Bahia_Banderas",
"America/Hermosillo",
"America/Mazatlan",
"Mexico/BajaSur",
)


@pytest.mark.parametrize(
"tz_string",
list(filter(lambda x: x not in pytz_unsupported, all_timezones)),
)
async def test_encode_time_exhaustive_timezones_pytz(tz_string: str):
await inner_test_time(time(12, 4, 5, 123456, tzinfo=timezone(tz_string)))

0 comments on commit 8087d7c

Please sign in to comment.