Ran black to help my sanity

main
Jef Roosens 2021-08-11 16:42:34 +02:00
parent d1c6eaa602
commit 8114498ca1
Signed by: Jef Roosens
GPG Key ID: 955C0660072F691F
4 changed files with 60 additions and 39 deletions

View File

@ -13,17 +13,22 @@ def existing_path(path_str):
if not path.exists() or not path.is_file(): if not path.exists() or not path.is_file():
raise argparse.ArgumentTypeError("Config file doesn't exist or isn't a file.") raise argparse.ArgumentTypeError("Config file doesn't exist or isn't a file.")
return path return path
async def main(): async def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config-file", help="Path to config file.", parser.add_argument(
default="padel.ini", type=existing_path) "-c",
parser.add_argument("-d", "--days", "--config-file",
help="How many days in advance to look.", default=1, help="Path to config file.",
type=int) default="padel.ini",
type=existing_path,
)
parser.add_argument(
"-d", "--days", help="How many days in advance to look.", default=1, type=int
)
parser.add_argument("club_id", help="ID of the club to check.", type=int) parser.add_argument("club_id", help="ID of the club to check.", type=int)
args = parser.parse_args() args = parser.parse_args()
@ -33,11 +38,12 @@ async def main():
# TODO check if config file can be read # TODO check if config file can be read
config.read(args.config_file) config.read(args.config_file)
res = await get_decent_timeslots(args.club_id, res = await get_decent_timeslots(
config["DEFAULT"]["weather_api_key"], args.club_id, config["DEFAULT"]["weather_api_key"], args.days
args.days) )
print(res) print(res)
if __name__ == "__main__": if __name__ == "__main__":
aio.run(main()) aio.run(main())

View File

@ -5,7 +5,7 @@ import asyncio as aio
# Provided by my code monkey Lander # Provided by my code monkey Lander
GOOD_CODES = [1000,1003, 1006, 1009,1150, 1153] GOOD_CODES = [1000, 1003, 1006, 1009, 1150, 1153]
async def get_decent_timeslots(club_id, weather_api_key, days=1): async def get_decent_timeslots(club_id, weather_api_key, days=1):
@ -17,16 +17,18 @@ async def get_decent_timeslots(club_id, weather_api_key, days=1):
weather_api = WeatherAPI(weather_api_key) weather_api = WeatherAPI(weather_api_key)
weather_forecasts, timeslots = await aio.gather( weather_forecasts, timeslots = await aio.gather(
weather_api.get_hourly_conditions(club_address, days=days), weather_api.get_hourly_conditions(club_address, days=days),
get_time_slots(club_id, days=days) get_time_slots(club_id, days=days),
) )
# Filter out bad weather forecasts & sort them according to date & time # Filter out bad weather forecasts & sort them according to date & time
weather_forecasts = sorted(filter(lambda x: x[2] in GOOD_CODES, weather_forecasts = sorted(
weather_forecasts), key=lambda x: x[0]) filter(lambda x: x[2] in GOOD_CODES, weather_forecasts), key=lambda x: x[0]
)
# Convert weather_forecasts to dict for faster lookups # Convert weather_forecasts to dict for faster lookups
weather_forecasts = {date_obj: weather_str for date_obj, weather_str, _ in weather_forecasts = {
weather_forecasts} date_obj: weather_str for date_obj, weather_str, _ in weather_forecasts
}
print(weather_forecasts) print(weather_forecasts)
# Filter out non-free timeslots # Filter out non-free timeslots
@ -42,7 +44,7 @@ async def get_decent_timeslots(club_id, weather_api_key, days=1):
valid = True valid = True
while start_hour < start_time + duration: while start_hour < start_time + duration:
if (weather_name := weather_forecasts.get(start_hour)): if weather_name := weather_forecasts.get(start_hour):
weather_names.append(weather_name) weather_names.append(weather_name)
# If we can't find any information about the timeslot, we assume # If we can't find any information about the timeslot, we assume
@ -50,7 +52,7 @@ async def get_decent_timeslots(club_id, weather_api_key, days=1):
else: else:
valid = False valid = False
break break
start_hour += timedelta(hours=1) start_hour += timedelta(hours=1)
if valid: if valid:

View File

@ -20,7 +20,9 @@ def extract_timeslots(tbody, column_headers):
# Iterate over each column # Iterate over each column
for td in tr.findAll("td"): for td in tr.findAll("td"):
# Find first empty counter # Find first empty counter
counter_index = next((i for i in range(len(counters)) if counters[i] <= 0), None) counter_index = next(
(i for i in range(len(counters)) if counters[i] <= 0), None
)
# this means there's no empty counters atm # this means there's no empty counters atm
if counter_index is None: if counter_index is None:
@ -57,38 +59,43 @@ def extract_calendar(soup: BeautifulSoup, reservation_date):
# the real stuff # the real stuff
tbody = reservation_t.find("tbody") tbody = reservation_t.find("tbody")
timeslots = extract_timeslots(tbody, court_names) timeslots = extract_timeslots(tbody, court_names)
# Here, we convert the timeslots to datetime instead of time # Here, we convert the timeslots to datetime instead of time
return [(col, status, datetime.combine(reservation_date, start), duration) for col, status, start, duration in timeslots] return [
(col, status, datetime.combine(reservation_date, start), duration)
for col, status, start, duration in timeslots
]
async def get_time_slots(club_id: int, days=1): async def get_time_slots(club_id: int, days=1):
dates = [date.today() + timedelta(days=i) for i in range(days)] dates = [date.today() + timedelta(days=i) for i in range(days)]
async def get_calendar(date_obj): async def get_calendar(date_obj):
r = requests.get(BASE_URL, params={ r = requests.get(
"clubId": club_id, BASE_URL,
"planningDay": date_obj.strftime("%d-%m-%Y") params={"clubId": club_id, "planningDay": date_obj.strftime("%d-%m-%Y")},
}) )
soup = BeautifulSoup(r.content, "html.parser") soup = BeautifulSoup(r.content, "html.parser")
return extract_calendar(soup, date_obj) return extract_calendar(soup, date_obj)
output = [] output = []
for coro in aio.as_completed([ for coro in aio.as_completed([get_calendar(date_obj) for date_obj in dates]):
get_calendar(date_obj) for date_obj in dates
]):
res = await coro res = await coro
output.extend(res) output.extend(res)
return output return output
async def get_club_address(club_id: int): async def get_club_address(club_id: int):
r = requests.get(BASE_URL, params={ r = requests.get(
"clubId": club_id, BASE_URL,
"tab": "club", params={
}) "clubId": club_id,
"tab": "club",
},
)
soup = BeautifulSoup(r.content, "html.parser") soup = BeautifulSoup(r.content, "html.parser")
tab_div = soup.find("div", id="club") tab_div = soup.find("div", id="club")

View File

@ -13,22 +13,28 @@ class WeatherAPI:
return requests.get(f"{self.BASE_URL}{endpoint}", params=params) return requests.get(f"{self.BASE_URL}{endpoint}", params=params)
async def get_hourly_conditions(self, query, days=1): async def get_hourly_conditions(self, query, days=1):
r = self._get("/forecast.json", { r = self._get(
"q": query, "/forecast.json",
"days": days, {
"aqi": "no", "q": query,
"alert": "no", "days": days,
}) "aqi": "no",
"alert": "no",
},
)
if r.status_code != 200: if r.status_code != 200:
return None return None
data = r.json() data = r.json()
hour_forecasts = sum([obj["hour"] for obj in data["forecast"]["forecastday"]], []) hour_forecasts = sum(
[obj["hour"] for obj in data["forecast"]["forecastday"]], []
)
return [ return [
( (
datetime.fromtimestamp(forecast["time_epoch"]), datetime.fromtimestamp(forecast["time_epoch"]),
forecast["condition"]["text"], forecast["condition"]["text"],
forecast["condition"]["code"], forecast["condition"]["code"],
) for forecast in hour_forecasts )
for forecast in hour_forecasts
] ]