from datetime import datetime, timedelta, time from django.contrib.auth.models import User import datetime as Datetime import pickle import pytz import os from dateutil.relativedelta import relativedelta from django.db import models import urllib.request import ssl from icalendar import Calendar, Event import recurring_ical_events import caldav from mycaldav.settings import * Key_separator = "--" # Create your models here. class cls_caldav(): url = "" data = None items = [] day = [] night = [] def __init__(self, url=""): self.url = url def clear_data(self): self.data = None self.items = [] self.day = [] self.night = [] #Trie les tableau par odre croissant sur la date de début de l'événement def sort_array(self): self.items.sort(key=lambda x: x.dtstamp, reverse=False) def sort_array_by_key(self,reverse=False): self.items.sort(key=lambda x: x.key, reverse=reverse) self.day.sort(key=lambda x: x.key, reverse=reverse) self.night.sort(key=lambda x: x.key, reverse=reverse) def caldav_open_url(self, days_delta, date): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.clear_data() modified_url = self.url + f"&start={int((date-timedelta(days=days_delta)).timestamp())}&end={int((date +timedelta(days=days_delta)).timestamp())}&expand=1" print(f"ICS CALL URL = {modified_url}") o_url = urllib.request.urlopen(modified_url , context=ctx) self.data = o_url.read() o_url.close() def get_caldav_data(self,periode=1,calendar=None, date=None): if date is None: date = datetime.now() self.caldav_open_url(days_delta=periode,date=date) today = (date.year,date.month,date.day) events = None if periode == 1: events = recurring_ical_events.of(Calendar.from_ical(self.data)).at(today) elif periode == 2: tomorow = date + timedelta(days=1) events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,tomorow) elif periode == 3: today = date + timedelta(days=-2) tomorow = date + timedelta(days=2) print(f"affichage: {today} <> {tomorow}") events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,tomorow) elif periode == 7: endweek = date + timedelta(days=7) events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,endweek) self.parse_data(events) def parse_data(self, events): for event in events: item = _caldav_item() item.name = event["SUMMARY"] item.uuid = f"{event['UID']}" if Key_separator in item.name: arr = item.name.split(Key_separator) item.key = arr[0] if "$" in item.key: item.team_transfert = True item.key = item.key.replace("$","") if "Manif" in item.key: item.team_manif = True item.key = item.key.replace("Manif", "") item.name = arr[1] if "&" in item.name: arr = item.name.split("&") item.team_1 = arr[0] if "#" in item.team_1: item.team_1_chef = True item.team_1 = item.team_1.replace("#","") item.team_2 = arr[1] if "#" in item.team_2: item.team_2_chef = True item.team_2 = item.team_2.replace("#", "") if "DESCRIPTION" in event.keys(): item.desc = f"{event['DESCRIPTION']}" if "#Fait" in item.desc: item.done = True item.desc = item.desc.replace("#Fait", "") if '{href=' in item.desc: temp_str = item.desc.split('{href=')[1] temp_str = temp_str.split('}')[0] item.href = temp_str item.desc = item.desc.replace("{href=" + item.href + "}","") item.dtstart = event["DTSTART"].dt.strftime("%d.%m.%Y %H:%M") item.dtstamp = int(event["DTSTART"].dt.strftime("%Y%m%d%H%M")) #print(item.dtstamp) item.dtend = event["DTEND"].dt.strftime("%d.%m.%Y %H:%M") item.format_str_date() self.items.append(item) start = datetime.strptime(item.dtstart,"%d.%m.%Y %H:%M") if type(start) is Datetime.date: start = datetime.combine(start, datetime.min.time()) print("convert Date to datetime") print(f"Parse Data {item.name} start:{start}") if int(start.strftime("%H")) < 19: self.day.append(item) if int(start.strftime("%H")) >= 19: print("add to night ****") self.night.append(item) self.sort_array() return self.data class cls_caldav_client(): caldav_url = "https://sync.infomaniak.com" #caldav_user = 'SC01066' #ambulanciers@ambulance-clerc.ch #caldav_password = "mc144*1870CLERC" caldav_user = 'AA01593' #agenda@ambulance-clerc.ch caldav_password = "Agendamc144" caldav_agenda_config_url = caldav_cfg["task_config"] events = None def __init__(self): self.client = caldav.DAVClient(url=self.caldav_url, username=self.caldav_user, password=self.caldav_password) self.data = self.client.principal() for cal in self.data.calendars(): print(f"name: {cal.name}({cal.url})") self.a_task = self.data.calendar(cal_id=caldav_id["task"]) self.a_team = self.data.calendar(cal_id=caldav_id["team"]) self.a_vhc = self.data.calendar(cal_id=caldav_id["vhc"]) self.a_op = self.data.calendar(cal_id=caldav_id["op"]) self.a_task_config = self.data.calendar(cal_id=caldav_id["task_config"]) self.a_road = self.data.calendar(cal_id=caldav_id["road"]) self.a_rh = self.data.calendar(cal_id=caldav_id["rh"]) self.a_manif = self.data.calendar(cal_id=caldav_id["manif"]) #self.get_events_by_date(self.a_team) def get_event_by_uuid(self,calandar,uuid): event = calandar.object_by_uid(uid=uuid) return event #self.get_events_by_date(calandar) #for event in self.events: # print(event.vobject_instance.vevent.eventId) def mark_as_done_task(self,calandar,uuid): event = self.get_event_by_uuid(calandar=calandar,uuid=uuid) try: _desc = event.vobject_instance.vevent.description.value + "#Fait" _desc.replace("#Fait#Fait","#Fait") except AttributeError: _desc = "#Fait" my_event = calandar.save_event( dtstart=event.vobject_instance.vevent.dtstart.value, dtend=event.vobject_instance.vevent.dtend.value, summary=event.vobject_instance.vevent.summary.value, description=_desc, ) event.delete() def init_task_management(self): print("pass copy task process") try: with open(os.path.join("mycaldav", "last_sync_config.bin"), "rb") as file: array = pickle.load(file) except: array = {"year": 0, "month": 0} print("Erreur lecture fichier config") if array["month"] == datetime.now().month: if array["year"] == datetime.now().year: print("pas de copy, sync déjà fait") else: self.copy_caldav_data(self.a_task) else: self.copy_caldav_data(self.a_task) self.clear_all_events_by_Date(self.a_task) def add_event_in_calandar(self, calandar): my_event = calandar.save_event( dtstart=datetime(2022, 6, 30, 12), dtend=datetime(2022, 6, 30, 13), summary="Do the needful", ) def get_events_by_date(self,calandar,date=None ): if date is None: date = datetime.now() tomorow = date + timedelta(days=7) self.events = calandar.date_search( start=datetime(date.year, date.month, date.day), end=datetime(tomorow.year, tomorow.month, tomorow.day), expand=False ) for event in self.events: self.format_data_event(event) return def get_lastName_with_MC(self,search): print(f"search équipage 1: {search}") user = User.objects.get(username=f"mc{search}@clerc.ch") last_name = f"{user.last_name} {user.first_name[0]}." return last_name def format_data_event(self,event): summary = event.vobject_instance.vevent.summary.value if Key_separator not in summary: return False key = summary.split(Key_separator) if "&" not in key[1]: return False equipage = key[1].split("&") team_1 = "" team_2 = "" updated = False print(f"{event.vobject_instance.vevent.dtstart.value} check équipage({key[0]}) {key[1]}") if "MC" not in equipage[0]: # check équipage 1 team_1 = equipage[0].strip().replace('#', '') try: last_name = self.get_lastName_with_MC(team_1) equipage[0] = equipage[0].replace(team_1, f"MC{team_1} {last_name}") updated = True except : print(f"Error: mc{team_1}@clerc.ch @{summary}") return None else: print("Already with MC") if len(equipage) < 2: print(f"Error équipage sans [&] {key[1]}") elif "MC" not in equipage[1]: # check équipage 2 team_2 = equipage[1].strip().replace('#','') try: last_name = self.get_lastName_with_MC(team_2) equipage[1] = equipage[1].replace(team_2, f"MC{team_2} {last_name}") updated = True except : print(f"Error: mc{team_2}@clerc.ch @{summary}") return None else: print("Already with MC") if updated: event.vobject_instance.vevent.summary.value = f"{key[0]} {Key_separator} {equipage[0].strip()} & {equipage[1].strip()}" event.save() def copy_caldav_data(self, calandar=None): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE o_url = urllib.request.urlopen(self.caldav_agenda_config_url, context=ctx) data = o_url.read() o_url.close() now = f"01.{datetime.now().month}.{datetime.now().year}" next_month = datetime.strptime(f"1.{datetime.today().month}.{datetime.today().year}","%d.%m.%Y") + relativedelta(months=1) next_month = (next_month.year, next_month.month, next_month.day) today = (datetime.now().year, datetime.now().month, 1) print(f"today:{today}") print(f"next:{next_month}") events = None events = recurring_ical_events.of(Calendar.from_ical(data)).between(today, next_month) cur_events = calandar.date_search(start=datetime(today[0],today[1],today[2]), end=datetime(next_month[0],next_month[1],next_month[2]), expand=True) for event in events: print(f"copy de l'événement: {event['SUMMARY']}") _title = event["SUMMARY"] if ("SUMMARY" in event) else "Sans Titre" _desc = event["DESCRIPTION"] if ("DESCRIPTION" in event) else "" my_event = calandar.save_event( dtstart=event["DTSTART"].dt, dtend=event["DTEND"].dt, summary= _title, description= _desc, ) with open(os.path.join("mycaldav", "last_sync_config.bin"), "wb") as file: array = {"year":datetime.now().year, "month":datetime.now().month} pickle.dump(array, file) def clear_all_events_by_Date(self, calandar): events = calandar.date_search(start=datetime(2000, 1, 1), end=datetime.now() - relativedelta(years=1), expand=True) for e in events: e.delete() #print(events) def reformat_all_events(self,calandar): events = calandar.date_search(start=datetime(2023, 1, 1), end=datetime.now() + relativedelta(years=1), expand=True) for event in events: summary = event.vobject_instance.vevent.summary.value print(f"test summary {summary}") if "----" in summary: summary = summary.replace("----", Key_separator) print(f"reformat: [{summary}]") event.vobject_instance.vevent.summary.value = summary event.save() elif "--" in summary: pass elif "-" in summary: summary = summary.replace("-",Key_separator) print(f"reformat: [{summary}]") event.vobject_instance.vevent.summary.value = summary event.save() def change_utc_to_zurich_all_events(self,calandar): events = calandar.date_search(start=datetime(2023, 4, 1), end=datetime.now() + relativedelta(years=1), expand=True) for event in events: start = event.vobject_instance.vevent.dtstart.value end = start + relativedelta(day=1) if start.hour < 12: new_start = datetime(start.year, start.month, start.day,7,0,0, tzinfo = pytz.timezone("Europe/Zurich")) new_end = datetime(start.year, start.month, start.day, 19, 0, 0, tzinfo=pytz.timezone("Europe/Zurich")) elif start.hour > 12: new_start = datetime(start.year, start.month, start.day,19,0,0, tzinfo = pytz.timezone("Europe/Zurich")) new_end = datetime(start.year, start.month,start.day, 23, 59, 0, tzinfo=pytz.timezone("Europe/Zurich")) event.vobject_instance.vevent.dtstart.value = new_start event.vobject_instance.vevent.dtend.value = new_end if new_end{new_end}") print("ERROR") event.vobject_instance.vevent.dtend.value = new_start event.save() class _caldav_item(): def __init__(self, key="", name="", desc="", dtstart=None, str_start_date="", str_start_time="", dtend=None, str_end_date="", str_end_time="", dtstamp=None, done=False, href="", uuid="", team_1="", team_2="", team_1_chef=False, team_2_chef=False, team_transfert=False, team_manif=False): self.key = key self.name = name self.desc = desc self.dtstart = dtstart self.str_start_date = str_start_date self.str_start_time = str_start_time self.dtend = dtend self.str_end_date = str_end_date self.str_end_time = str_end_time self.dtstamp = dtstamp self.done = done self.href = href self.uuid = uuid self.team_1 = team_1 self.team_2 = team_2 self.team_1_chef = team_1_chef self.team_2_chef = team_2_chef self.team_transfert = team_transfert self.team_manif = team_manif def format_str_date(self): self.str_start_date = datetime.strptime(self.dtstart,"%d.%m.%Y %H:%M").strftime("%d.%m") self.str_start_time = datetime.strptime(self.dtstart, "%d.%m.%Y %H:%M").strftime("%H:%M") self.str_end_date = datetime.strptime(self.dtend,"%d.%m.%Y %H:%M").strftime("%d.%m") self.str_end_time = datetime.strptime(self.dtend, "%d.%m.%Y %H:%M").strftime("%H:%M") from rest_framework import serializers class CalDavItemSerializer(serializers.Serializer): key = serializers.CharField() name = serializers.CharField() desc = serializers.CharField(allow_blank=True) dtstart = serializers.CharField() str_start_date = serializers.CharField() str_start_time = serializers.CharField() dtend = serializers.CharField() str_end_date = serializers.CharField() str_end_time = serializers.CharField() done = serializers.BooleanField() href = serializers.CharField(allow_blank=True) uuid = serializers.CharField() team_1 = serializers.CharField(allow_blank=True) team_2 = serializers.CharField(allow_blank=True) team_1_chef = serializers.BooleanField() team_2_chef = serializers.BooleanField() team_transfert = serializers.BooleanField() team_manif = serializers.BooleanField()