Files
Reskreen/mycaldav/models.py
2022-10-19 11:21:27 +02:00

327 lines
11 KiB
Python

from datetime import datetime, timedelta, time
from django.contrib.auth.models import User
import datetime as Datetime
import pickle
import os
from dateutil.relativedelta import relativedelta
from django.db import models
import urllib.request
import ssl
from icalendar import Calendar, Event
import recurring_ical_events
import caldav
from mycaldav.settings import *
# Create your models here.
class cls_caldav():
url = ""
data = None
items = []
day = []
night = []
def __init__(self, url=""):
self.url = url
def clear_data(self):
self.data = None
self.items = []
self.day = []
self.night = []
#Trie les tableau par odre croissant sur la date de début de l'événement
def sort_array(self):
self.items.sort(key=lambda x: x.dtstamp, reverse=False)
def sort_array_by_key(self,reverse=False):
self.items.sort(key=lambda x: x.key, reverse=reverse)
self.day.sort(key=lambda x: x.key, reverse=reverse)
self.night.sort(key=lambda x: x.key, reverse=reverse)
def get_caldav_data(self,periode=1,calendar=None, date=None):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.clear_data()
o_url = urllib.request.urlopen(self.url, context=ctx)
self.data = o_url.read()
o_url.close()
if date is None:
date = datetime.now()
today = (date.year,date.month,date.day)
events = None
if periode == 1:
events = recurring_ical_events.of(Calendar.from_ical(self.data)).at(today)
elif periode == 2:
tomorow = date + timedelta(days=1)
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,tomorow)
elif periode == 7:
endweek = date + timedelta(days=6)
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,endweek)
self.parse_data(events)
def parse_data(self, events):
for event in events:
item = _caldav_item()
item.name = event["SUMMARY"]
item.uiid = event["UID"]
if "-" in item.name:
arr = item.name.split("-")
item.key = arr[0]
if "$" in item.key:
item.team_transfert = True
item.key = item.key.replace("$","")
if "Manif" in item.key:
item.team_manif = True
item.key = item.key.replace("Manif", "")
item.name = arr[1]
if "&" in item.name:
arr = item.name.split("&")
item.team_1 = arr[0]
if "#" in item.team_1:
item.team_1_chef = True
item.team_1 = item.team_1.replace("#","")
item.team_2 = arr[1]
if "#" in item.team_2:
item.team_2_chef = True
item.team_2 = item.team_2.replace("#", "")
if "DESCRIPTION" in event.keys():
item.desc = event["DESCRIPTION"]
if "#" in item.desc:
item.done = True
if '{href=' in item.desc:
temp_str = item.desc.split('{href=')[1]
temp_str = temp_str.split('}')[0]
item.href = temp_str
item.desc = item.desc.replace("{href=" + item.href + "}","")
item.dtstart = event["DTSTART"].dt.strftime("%d.%m.%Y %H:%M")
item.dtstamp = int(event["DTSTART"].dt.strftime("%Y%m%d%H%M"))
print(item.dtstamp)
item.dtend = event["DTEND"].dt.strftime("%d.%m.%Y %H:%M")
item.format_str_date()
self.items.append(item)
start = datetime.strptime(item.dtstart,"%d.%m.%Y %H:%M")
if type(start) is Datetime.date:
start = datetime.combine(start, datetime.min.time())
print("convert Date to datetime")
print(start)
if int(start.strftime("%H")) < 19:
self.day.append(item)
if int(start.strftime("%H")) >= 19:
self.night.append(item)
self.sort_array()
return self.data
class cls_caldav_client():
caldav_url = "https://sync.infomaniak.com"
#caldav_user = 'SC01066' #ambulanciers@ambulance-clerc.ch
#caldav_password = "mc144*1870CLERC"
caldav_user = 'AA01593' #agenda@ambulance-clerc.ch
caldav_password = "Agendamc144"
caldav_agenda_config_url = caldav_cfg["task_config"]
events = None
def __init__(self):
self.client = caldav.DAVClient(url=self.caldav_url, username=self.caldav_user, password=self.caldav_password)
self.data = self.client.principal()
for cal in self.data.calendars():
print(f"name: {cal.name}({cal.url})")
self.a_task = self.data.calendar(name="Tâche")
self.a_team = self.data.calendar(name="Équipage hébdomadaire")
#self.get_events_by_date(self.a_team)
def init_task_management(self):
print("pass copy task process")
try:
with open(os.path.join("mycaldav", "last_sync_config.bin"), "rb") as file:
array = pickle.load(file)
except:
array = {"year": 0, "month": 0}
print("Erreur lecture fichier débiteur")
if array["month"] == datetime.now().month:
if array["year"] == datetime.now().year:
print("pas de copy, sync déjà fait")
else:
self.copy_caldav_data(self.a_task)
else:
self.copy_caldav_data(self.a_task)
self.clear_all_events_by_Date(self.a_task)
def add_event_in_calandar(self, calandar):
my_event = calandar.save_event(
dtstart=datetime(2022, 6, 30, 12),
dtend=datetime(2022, 6, 30, 13),
summary="Do the needful",
)
def get_events_by_date(self,calandar,date=None ):
if date is None:
date = datetime.now()
tomorow = date + timedelta(days=7)
self.events = calandar.date_search(
start=datetime(date.year, date.month, date.day), end=datetime(tomorow.year, tomorow.month, tomorow.day), expand=False
)
for event in self.events:
self.format_data_event(event)
return
def format_data_event(self,event):
summary = event.vobject_instance.vevent.summary.value
if "-" not in summary:
return False
key = summary.split("-")
if "&" not in key[1]:
return False
equipage = key[1].split("&")
team_1 = ""
team_2 = ""
updated = False
print(f"{event.vobject_instance.vevent.dtstart.value} check équipage({key[0]}) {key[1]}")
if "MC" not in equipage[0]: # check équipage 1
team_1 = equipage[0].strip().replace('#', '')
try:
print(f"search équipage 1: {team_1}")
user = User.objects.get(username=f"mc{team_1}@clerc.ch").last_name
equipage[0] = equipage[0].replace(team_1, f"MC{team_1} {user}")
updated = True
except :
print(f"Error: mc{team_1}@clerc.ch @{summary}")
return None
else:
print("Already with MC")
print(f"débug: {len(equipage)}")
if len(equipage) < 2:
print(f"Error équipage sans [&] {key[1]}")
elif "MC" not in equipage[1]: # check équipage 2
team_2 = equipage[1].strip().replace('#','')
try:
print(f"search équipage 2: {team_2}")
user = User.objects.get(username=f"mc{team_2}@clerc.ch").last_name
equipage[1] = equipage[1].replace(team_2, f"MC{team_2} {user}")
updated = True
except :
print(f"Error: mc{team_2}@clerc.ch @{summary}")
return None
else:
print("Already with MC")
if updated:
event.vobject_instance.vevent.summary.value = f"{key[0]} - {equipage[0].strip()} & {equipage[1].strip()}"
event.save()
def copy_caldav_data(self, calandar=None):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
o_url = urllib.request.urlopen(self.caldav_agenda_config_url, context=ctx)
data = o_url.read()
o_url.close()
now = f"01.{datetime.now().month}.{datetime.now().year}"
next_month = datetime.strptime(f"1.{datetime.today().month}.{datetime.today().year}","%d.%m.%Y") + relativedelta(months=1) - relativedelta(days=1)
next_month = (next_month.year, next_month.month, next_month.day)
today = (datetime.now().year, datetime.now().month, 1)
print(f"today:{today}")
print(f"next:{next_month}")
events = None
events = recurring_ical_events.of(Calendar.from_ical(data)).between(today, next_month)
cur_events = calandar.date_search(start=datetime(today[0],today[1],today[2]), end=datetime(next_month[0],next_month[1],next_month[2]), expand=True)
for event in events:
print(f"copy de l'événement: {event['SUMMARY']}")
my_event = calandar.save_event(
dtstart=event["DTSTART"].dt,
dtend=event["DTEND"].dt,
summary= event["SUMMARY"],
description=event["DESCRIPTION"],
)
with open(os.path.join("mycaldav", "last_sync_config.bin"), "wb") as file:
array = {"year":datetime.now().year, "month":datetime.now().month}
pickle.dump(array, file)
def clear_all_events_by_Date(self, calandar):
events = calandar.date_search(start=datetime(2000, 1, 1), end=datetime.now() - relativedelta(years=1), expand=True)
for e in events:
e.delete()
#print(events)
class _caldav_item():
key = ""
name = ""
desc = ""
dtstart = None
str_start_date = ""
str_start_time = ""
dtend = None
str_end_date = ""
str_end_time = ""
dtstamp = None
done = False
href = ""
uiid = ""
team_1 = ""
team_2 = ""
team_1_chef = False
team_2_chef = False
team_transfert = False
team_manif = False
def format_str_date(self):
self.str_start_date = datetime.strptime(self.dtstart,"%d.%m.%Y %H:%M").strftime("%d.%m")
self.str_start_time = datetime.strptime(self.dtstart, "%d.%m.%Y %H:%M").strftime("%H:%M")
self.str_end_date = datetime.strptime(self.dtend,"%d.%m.%Y %H:%M").strftime("%d.%m")
self.str_end_time = datetime.strptime(self.dtend, "%d.%m.%Y %H:%M").strftime("%H:%M")