Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mock flink sql gateway client #59

Closed
wants to merge 17 commits into from
Closed

Conversation

zqWu
Copy link
Contributor

@zqWu zqWu commented Mar 15, 2023

Description

a mock sql flink client
Resolves #34

  • receive any statement, save to list
  • return {"status":"finished"} for any operation_handle
PR Checklist

@gliter gliter marked this pull request as draft March 15, 2023 09:04
@gliter
Copy link
Collaborator

gliter commented Mar 15, 2023

I hope you don't mind. I have converted this into draft. I see that you are still working on it. Please use Ready for review button once this is ready.

@zqWu
Copy link
Contributor Author

zqWu commented Mar 15, 2023

test_seeds.py passed
but other tests, fail becase they need get result from "client", which hasn't implemented

  • test_table_materialization.py
  • test_source_schema_generation.py

@zqWu zqWu marked this pull request as ready for review March 15, 2023 10:35
for i in range(0, len(stats)):
assert sql_equivalent(stats[i], expect_sql_sequential[i])

# run a second time, need clean
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to test the same thing twice?

Copy link
Contributor Author

@zqWu zqWu Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an example in case someone need run operation (like dbt-run) multiple time in one function, i'll remove it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, with this mock it won't test anything new but with actual Flink instance it will be a valid test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

# we need setup MockFlinkSqlGatewayClient
MockFlinkSqlGatewayClient.create_session(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it to create and use instant instead of using it like singleton?
This will lead to problems in case of running tests in parallel or just not cleaning after in some single test.
Tests by nature should be isolated from each other as much as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are absolutely right, i'll handle it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"""
]
stats = MockFlinkSqlGatewayClient.all_statements()
assert len(stats) == len(expect_sql_sequential)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code could be extracted to separate class like AssertionUtils which we could add as extension to test cases and in tests just call
self.assertSqlEquals(mock.all_statements(), expected_sqls)
very similar how you would use regular asserts in unittests https://docs.python.org/3/library/unittest.html

def test_upper(self):
        self.assertEqual('foo'.upper(), 'FOO')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -46,6 +48,7 @@
select /** fetch_timeout_ms(10000) */ /** fetch_mode('streaming') */ * from base where id = 10
"""


class TestSeeds():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I ask you to move this class to new directory named component so we we will have unit tests in regular directories that correspond to modules its testing, component which will have all the tests with the mock you have created and then e2e for tests with actual Flink intance. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@staticmethod
def start():
httpretty.enable()
# DO NOT change register order, httpretty regex match = first match
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove copied comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return MockFlinkSqlGatewayClient.router.all_statements()


def sql_equivalent(s1: str, s2: str) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would land in the assert utils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self.assertEqual(200, r.status_code)
print(r.text)

# session 测试 =========
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep comments in English

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-_-

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope that's not a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@zqWu
Copy link
Contributor Author

zqWu commented Mar 15, 2023

done

@@ -0,0 +1,106 @@
import os
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have got small missunderstanding. This would be just a component test as it is using mock instead of real Flink, so should be in test/component in the future we should create an end-to-end test that will start actual Flink instance during test and it would be kept in test/e2e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return self.router.all_statements()

@staticmethod
def setup(host: str = None, port: str | int = None, session_name: str = None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the union | operator is as far as I am aware available from 3.10. It has unfortunately still used by very few people https://w3techs.com/technologies/history_details/pl-python/3
in setup.py we are stating this library is using Python 3.7 or newer. Can we get rid of this for now? Maybe lets standardize port as int type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

import json


class MockFlinkSqlGatewayClient:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got very confused by this name. I initially thought this is a replacment for flink.sqlgateway.client.FlinkSqlGatewayClient but it is actually a Mock of an SqlGateway and we still use the regular client. So maybe the better name would be just MockFlinkSqlGateway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a mock sqlgateway

self.router = GwRouter(test_config)
self.router.start()
# create session
r = requests.post(f"{host_port}/v1/sessions", json.dumps({"sessionName": f"{session_name}"}))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like not actually used. It will be the real flink.sqlgateway.client.FlinkSqlGatewayClient that will be opening the session anyway.

session_handle = r.json()['sessionHandle']
session = SqlGatewaySession(SqlGatewayConfig(host, port, session_name), session_handle)
self.session = session
return session
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return is ignored by rest of the code.

self.session = session
return session

def execute_statement(self, sql: str) -> SqlGatewayOperation:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used.

@gliter
Copy link
Collaborator

gliter commented Mar 16, 2023

@zqWu Sorry for the delay. Had finally a chance to actually run properly your PR. This looks really great! Thank you for that work. I have left some comments. Looked like you initially wanted to substitute flink.sqlgateway.client.FlinkSqlGatewayClient and later changed your mind to use http mock. Am I right? There are leftovers from your initial idea.

Thanks for changing it to use instance. I am not familiar with that library, would it work if we would create few instances (but using different port each time) in parallel? This is problem for another time anyway.

Also in the future I guess we will need to add more complex behaviors like making it return error or some specific response.

Please check my comments and then I can merge it.

@zqWu
Copy link
Contributor Author

zqWu commented Mar 17, 2023

thanks for review, i'll handle the problems above

@zqWu
Copy link
Contributor Author

zqWu commented Mar 17, 2023

i have implemented a mock sqlgateway, with some test code.
still in progress

i think, it will be useful for some test ( not stream and some flink specific sql)

for real e2e test when not use default_catalog, it requires:

  • gateway, flink, and other like minio, hms, mysql, kafka (even a nginx if you want to see all the sql)
  • maual init a session, and create a catalog in this session ( or some other init like db/table for test )
  • set this session in ~/.dbt/flink-session.yaml

these works seems heavy to repeat
with mock gateway can sav a lot effort
it can run in debug mode in pycharm, pretty friend for beginners like me

@gliter
Copy link
Collaborator

gliter commented Mar 17, 2023

Should be doable using testcontainers. This is something to be left for the future.

else:
raise "NOT support"

def _execute_by_sqlite(self, sql) -> list[Any]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return type should be List[Any] imported from typing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return [x.replace(pattern, chars) for x in r] if pattern else r

@staticmethod
def _add_data_node(res_list, kind, fields: str | List[Any]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Union of types is not supported in Python < 3.10

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@gliter
Copy link
Collaborator

gliter commented Mar 20, 2023

The file paths are unnecessary nested. Component tests should just be in tests/component instead of test/component/mock_gw/tests.

@zqWu zqWu marked this pull request as draft March 20, 2023 14:36
@zqWu
Copy link
Contributor Author

zqWu commented Mar 21, 2023

my branch for this pr is messy, gonna close this and promot another

@zqWu zqWu closed this Mar 21, 2023
@zqWu zqWu mentioned this pull request Mar 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create Flink SQL gateway mock for component tests
2 participants