Skip to content

Commit

Permalink
added time encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
07pepa committed Apr 17, 2024
1 parent fc79936 commit 47a6c77
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 1 deletion.
54 changes: 54 additions & 0 deletions beanie/odm/utils/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,58 @@
from beanie.odm.fields import Link, LinkTypes
from beanie.odm.utils.pydantic import IS_PYDANTIC_V2, get_model_fields

DT_utc_object = datetime.datetime.now(datetime.timezone.utc)


def _compatct_time_encoder(time: datetime.time) -> str:
# reimpementation of speeddate in python https://github.com/pydantic/speedate/blob/aad1d9117a05618ae883e20ae179c582eb998eeb
# /src/time.rs#L41 and pydantic https://github.com/pydantic/pydantic-core/blob/f636403c2e8169bcbb94aa71a0727d76076f7e6e/src/input/datetime.rs#L177
if time.microsecond != 0:
time_string = "{:02d}:{:02d}:{:02d}.{:06d}".format(
time.hour, time.minute, time.second, time.microsecond
)
elif time.second != 0:
time_string = "{:02d}:{:02d}:{:02d}".format(
time.hour, time.minute, time.second
)
else:
time_string = "{:02d}:{:02d}".format(time.hour, time.minute)

tz_offset = None
if time.tzinfo is not None: # has timezone
# check if the offset is fixed no matter the datetime
offset_delta = time.tzinfo.utcoffset(None)
if offset_delta is None: # is ambiguous resolve base on current time
offset_delta = time.tzinfo.utcoffset(datetime.datetime.now())
if offset_delta is not None: # is not ambiguous
tz_offset = round(offset_delta.total_seconds())
else: # even if stuffing time into may work is wrong see
raise ValueError(
"timezone is not fixed and cannot be resolved with datetime,"
f"contact the maintainer of {type(time.tzinfo).__name__} for support"
)
else: # is known without datetime
tz_offset = round(offset_delta.total_seconds())

if tz_offset is not None:
if tz_offset == 0:
time_string += "Z"
else:
is_negative = tz_offset < 0
hours = abs(tz_offset // 3600)
minutes = abs(tz_offset % 3600) // 60

# since minutes are negative we need to subtract them from 60 and roll one hour back to be correct
if is_negative and minutes != 0:
hours -= 1
minutes = 60 - minutes

sign = "-" if is_negative else "+"
time_string += "{}{:02d}:{:02d}".format(sign, hours, minutes)

return time_string


SingleArgCallable = Callable[[Any], Any]
DEFAULT_CUSTOM_ENCODERS: MutableMapping[type, SingleArgCallable] = {
ipaddress.IPv4Address: str,
Expand All @@ -37,8 +89,10 @@
pathlib.PurePath: str,
pydantic.SecretBytes: pydantic.SecretBytes.get_secret_value,
pydantic.SecretStr: pydantic.SecretStr.get_secret_value,
# datetimes
datetime.date: lambda d: datetime.datetime.combine(d, datetime.time.min),
datetime.timedelta: operator.methodcaller("total_seconds"),
datetime.time: _compatct_time_encoder,
enum.Enum: operator.attrgetter("value"),
Link: operator.attrgetter("ref"),
bytes: bson.Binary,
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ test = [
"pydantic-settings>=2",
"pydantic-extra-types>=2",
"pydantic[email]",
"pytz",
"tzdata",
"freezegun"
]
doc = [
"Pygments>=2.8.0",
Expand Down
2 changes: 2 additions & 0 deletions tests/odm/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DocNonRoot,
DocumentForEncodingTest,
DocumentForEncodingTestDate,
DocumentForEncodingTestTime,
DocumentMultiModelOne,
DocumentMultiModelTwo,
DocumentTestModel,
Expand Down Expand Up @@ -289,6 +290,7 @@ async def init(db):
DocumentWithLinkForNesting,
DocumentWithBackLinkForNesting,
LongSelfLink,
DocumentForEncodingTestTime,
]
await init_beanie(
database=db,
Expand Down
4 changes: 4 additions & 0 deletions tests/odm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,3 +1101,7 @@ class LongSelfLink(Document):

class Settings:
max_nesting_depth = 50


class DocumentForEncodingTestTime(Document):
time_field: datetime.time
98 changes: 97 additions & 1 deletion tests/odm/test_encoder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import re
from datetime import date, datetime
from datetime import date, datetime, time
from datetime import timezone as dt_timezone
from uuid import uuid4

import pytest
from bson import Binary, Regex
from freezegun import freeze_time
from pydantic import AnyUrl

# support for older python versions (will cause to test some things twice)
from pytz import UTC, all_timezones, timezone

from beanie.odm.utils.encoder import Encoder
from beanie.odm.utils.pydantic import IS_PYDANTIC_V2
from tests.odm.models import (
Child,
DocumentForEncodingTest,
DocumentForEncodingTestDate,
DocumentForEncodingTestTime,
DocumentWithComplexDictKey,
DocumentWithDecimalField,
DocumentWithHttpUrlField,
Expand All @@ -21,6 +27,12 @@
SampleWithMutableObjects,
)

has_zone_info = True
try:
from zoneinfo import ZoneInfo, available_timezones
except ImportError:
has_zone_info = False


async def test_encode_datetime():
assert isinstance(Encoder().encode(datetime.now()), datetime)
Expand Down Expand Up @@ -169,3 +181,87 @@ async def test_dict_with_complex_key():

assert isinstance(new_doc.dict_field, dict)
assert new_doc.dict_field.get(uuid) == dt


def is_same_type_or_subtype(obj1, obj2):
return isinstance(obj1, type(obj2)) or isinstance(obj2, type(obj1))


def assert_time_equal(to_test: time, reference: time):
if to_test.tzinfo is not None: # tz
if not is_same_type_or_subtype(to_test.tzinfo, reference.tzinfo):
if reference.tzinfo.utcoffset(None) is None:
now = (
datetime.now()
) # date dependsent because of daylight saving time
# up to 1-minute difference is allowed
assert (
to_test.tzinfo.utcoffset(now) // 60
== reference.tzinfo.utcoffset(now) // 60
)
# compare without info
assert to_test.replace(tzinfo=None) == reference.replace(
tzinfo=None
)
else:
assert to_test.replace(tzinfo=None) == reference.replace(
tzinfo=None
)
else:
assert to_test == reference
else:
assert to_test == reference


async def inner_test_time(test_time: time):
doc = DocumentForEncodingTestTime(time_field=test_time)
await doc.insert()
new_doc = await DocumentForEncodingTestTime.get(doc.id)
assert_time_equal(new_doc.time_field, doc.time_field)
assert isinstance(new_doc.time_field, time)
assert_time_equal(new_doc.time_field, test_time)


@freeze_time("2021-01-01 12:09:05.123456")
@pytest.mark.parametrize(
"test_time",
[
time(12),
time(12, fold=1),
time(12, 3),
time(12, 3, fold=1),
time(12, 4, 5),
time(12, 4, 5, fold=1),
time(12, 4, 5, 123456),
time(12, 4, 5, 123456, fold=1),
time(12, 4, 5, 123456, tzinfo=UTC),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=UTC, fold=1),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague"), fold=1),
time(12, 4, 5, 123456, tzinfo=dt_timezone.utc),
time(12, 4, 5, 123456, tzinfo=dt_timezone.utc, fold=1),
time(12, 4, 5, 123456, tzinfo=ZoneInfo("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=ZoneInfo("Europe/Prague"), fold=1),
],
)
async def test_encode_time_with_tz(test_time: time):
await inner_test_time(test_time)


if has_zone_info:
tz = list(available_timezones())
tz.sort()

@freeze_time("2021-01-01 12:09:05.123456")
@pytest.mark.parametrize("tz_string", tz)
async def test_encode_time_exhaustive_timezones_zone_info(tz_string: str):
await inner_test_time(
time(12, 4, 5, 123456, tzinfo=ZoneInfo(tz_string))
)


@freeze_time("2021-01-01 12:09:05.123456")
@pytest.mark.parametrize("tz_string", all_timezones)
async def test_encode_time_exhaustive_timezones_pytz(tz_string: str):
await inner_test_time(time(12, 4, 5, 123456, tzinfo=timezone(tz_string)))

0 comments on commit 47a6c77

Please sign in to comment.