435 lines
15 KiB
Python
435 lines
15 KiB
Python
from django.db import models
|
|
from datetime import datetime, timedelta, time
|
|
from django.contrib.auth.models import User
|
|
import datetime as Datetime
|
|
import pickle
|
|
import pytz
|
|
import os
|
|
from dateutil.relativedelta import relativedelta
|
|
|
|
|
|
|
|
from django.db import models
|
|
import urllib.request
|
|
import ssl
|
|
|
|
from icalendar import Calendar, Event
|
|
import recurring_ical_events
|
|
import caldav
|
|
|
|
from mycaldav.settings import *
|
|
import pytz
|
|
from dateutil.parser import parse
|
|
|
|
Key_separator = "--"
|
|
|
|
|
|
class caldav_sync_manager(models.Model):
|
|
dtDate = models.DateField('Date_synchronized', auto_now_add=True, unique=True)
|
|
bDone = models.BooleanField("Effectuée", default=False)
|
|
dtUpdated = models.DateTimeField('date updated', auto_now=True)
|
|
dtCreated = models.DateTimeField('date published', auto_now_add=True)
|
|
|
|
def __str__(self):
|
|
return f"{self.dtDate} => {self.bDone}"
|
|
|
|
def init_caldav(self, caldav_url, caldav_user, caldav_password):
|
|
client = caldav.DAVClient(url=caldav_url, username=caldav_user, password=caldav_password)
|
|
data = client.principal()
|
|
|
|
self.a_task = data.calendar(cal_id=caldav_id["task"])
|
|
|
|
def set_today_as_synced(self):
|
|
o_new_manager = caldav_sync_manager()
|
|
o_new_manager.bDone=True
|
|
o_new_manager.save()
|
|
|
|
def init_task_management(self):
|
|
today = datetime.now()
|
|
o_caldav_sync_management = caldav_sync_manager.objects.filter(dtDate=today.date())
|
|
if not o_caldav_sync_management.exists() or o_caldav_sync_management.first().bDone is False:
|
|
self.copy_caldav_data()
|
|
else:
|
|
print("pas de copy, sync déjà fait")
|
|
|
|
|
|
|
|
|
|
|
|
def copy_caldav_data(self,):
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
today = datetime.now()
|
|
seeked_date = today + relativedelta(months=1)
|
|
sync_date = today + relativedelta(months=1, days=1)
|
|
modified_url = caldav_cfg["task_config"] + f"&start={int(today.timestamp())}&end={int(sync_date.timestamp())}&expand=1"
|
|
|
|
with urllib.request.urlopen(modified_url, context=ctx) as o_url:
|
|
sabre_data = o_url.read()
|
|
|
|
events = recurring_ical_events.of(Calendar.from_ical(sabre_data)).at((seeked_date.year,seeked_date.month,seeked_date.day))
|
|
|
|
b_copy_done = False
|
|
for event in events:
|
|
b_copy_done = True
|
|
print(f"copy de l'événement: {event['SUMMARY']}")
|
|
|
|
|
|
_title = event["SUMMARY"] if ("SUMMARY" in event) else "Sans Titre"
|
|
_desc = event["DESCRIPTION"] if ("DESCRIPTION" in event) else ""
|
|
|
|
my_event = self.a_task.save_event(
|
|
dtstart=event["DTSTART"].dt,
|
|
dtend=event["DTEND"].dt,
|
|
summary= _title,
|
|
description= _desc,
|
|
)
|
|
if b_copy_done:
|
|
self.set_today_as_synced()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Create your models here.
|
|
class cls_caldav():
|
|
url = ""
|
|
data = None
|
|
items = []
|
|
day = []
|
|
night = []
|
|
|
|
def __init__(self, url=""):
|
|
self.url = url
|
|
|
|
def clear_data(self):
|
|
self.data = None
|
|
self.items = []
|
|
self.day = []
|
|
self.night = []
|
|
|
|
#Trie les tableau par odre croissant sur la date de début de l'événement
|
|
def sort_array(self):
|
|
self.items.sort(key=lambda x: x.dtstamp, reverse=False)
|
|
|
|
def sort_array_by_key(self,reverse=False):
|
|
self.items.sort(key=lambda x: x.key, reverse=reverse)
|
|
self.day.sort(key=lambda x: x.key, reverse=reverse)
|
|
self.night.sort(key=lambda x: x.key, reverse=reverse)
|
|
|
|
def caldav_open_url(self, days_delta, date):
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
self.clear_data()
|
|
|
|
|
|
modified_url = self.url + f"&start={int((date-timedelta(days=days_delta)).timestamp())}&end={int((date +timedelta(days=days_delta)).timestamp())}&expand=1"
|
|
print(f"ICS CALL URL = {modified_url}")
|
|
with urllib.request.urlopen(modified_url, context=ctx) as o_url:
|
|
self.data = o_url.read()
|
|
|
|
return self.data
|
|
|
|
def get_caldav_data(self,periode=1,calendar=None, date=None):
|
|
if date is None:
|
|
date = datetime.now()
|
|
|
|
self.caldav_open_url(days_delta=periode,date=date)
|
|
|
|
|
|
|
|
today = (date.year,date.month,date.day)
|
|
|
|
|
|
events = None
|
|
if periode == 1:
|
|
events = recurring_ical_events.of(Calendar.from_ical(self.data)).at(today)
|
|
elif periode == 2:
|
|
tomorow = date + timedelta(days=1)
|
|
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,tomorow)
|
|
elif periode == 3:
|
|
today = date + timedelta(days=-2)
|
|
tomorow = date + timedelta(days=2)
|
|
print(f"affichage: {today} <> {tomorow}")
|
|
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,tomorow)
|
|
elif periode == 7:
|
|
endweek = date + timedelta(days=7)
|
|
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,endweek)
|
|
|
|
self.parse_data(events)
|
|
|
|
def convert_to_gmt1(self, dt):
|
|
gmt1_tz = pytz.timezone('Europe/Paris')
|
|
if dt.tzinfo is not None:
|
|
# Convertir l'objet datetime en GMT+1
|
|
# Utilisez le nom de votre fuseau horaire GMT+1
|
|
dt_gmt1 = dt.astimezone(gmt1_tz)
|
|
else:
|
|
# L'objet datetime est naïf, ajouter l'information de fuseau horaire GMT+1
|
|
utc_tz = pytz.utc
|
|
dt_utc = utc_tz.localize(dt)
|
|
dt_gmt1 = dt_utc.astimezone(gmt1_tz)
|
|
return dt_gmt1
|
|
def parse_data(self, events):
|
|
desired_timezone = pytz.timezone('Europe/Paris')
|
|
for event in events:
|
|
item = _caldav_item()
|
|
item.name = event["SUMMARY"]
|
|
item.uuid = f"{event['UID']}"
|
|
if Key_separator in item.name:
|
|
arr = item.name.split(Key_separator)
|
|
item.key = arr[0]
|
|
if "$" in item.key:
|
|
item.team_transfert = True
|
|
item.key = item.key.replace("$","")
|
|
if "Manif" in item.key:
|
|
item.team_manif = True
|
|
item.key = item.key.replace("Manif", "")
|
|
item.name = arr[1]
|
|
if "&" in item.name:
|
|
arr = item.name.split("&")
|
|
item.team_1 = arr[0]
|
|
if "#" in item.team_1:
|
|
item.team_1_chef = True
|
|
item.team_1 = item.team_1.replace("#","")
|
|
item.team_2 = arr[1]
|
|
if "#" in item.team_2:
|
|
item.team_2_chef = True
|
|
item.team_2 = item.team_2.replace("#", "")
|
|
|
|
|
|
if "DESCRIPTION" in event.keys():
|
|
item.desc = f"{event['DESCRIPTION']}"
|
|
|
|
if "#Fait" in item.desc:
|
|
item.done = True
|
|
item.desc = item.desc.replace("#Fait", "")
|
|
|
|
|
|
if '{href=' in item.desc:
|
|
temp_str = item.desc.split('{href=')[1]
|
|
temp_str = temp_str.split('}')[0]
|
|
item.href = temp_str
|
|
item.desc = item.desc.replace("{href=" + item.href + "}","")
|
|
|
|
|
|
desired_timezone = pytz.timezone('Europe/Paris')
|
|
datetime_obj = parse(event["DTSTART"].dt.strftime("%d.%m.%Y %H:%M"))
|
|
gmt1_datetime = datetime_obj.astimezone(desired_timezone)
|
|
|
|
print(f"gmt= {self.convert_to_gmt1( event['DTSTART'].dt)}")
|
|
|
|
|
|
item.dtstart = self.convert_to_gmt1( event['DTSTART'].dt).strftime("%d.%m.%Y %H:%M")
|
|
item.dtstamp = int(self.convert_to_gmt1( event['DTSTART'].dt).strftime("%Y%m%d%H%M"))
|
|
#print(item.dtstamp)
|
|
item.dtend = self.convert_to_gmt1( event['DTEND'].dt).strftime("%d.%m.%Y %H:%M")
|
|
|
|
|
|
item.format_str_date()
|
|
self.items.append(item)
|
|
|
|
|
|
start = datetime.strptime(item.dtstart,"%d.%m.%Y %H:%M")
|
|
|
|
if type(start) is Datetime.date:
|
|
start = datetime.combine(start, datetime.min.time())
|
|
print("convert Date to datetime")
|
|
print(f"Parse Data {item.name} start:{start}")
|
|
if int(start.strftime("%H")) < 19:
|
|
self.day.append(item)
|
|
if int(start.strftime("%H")) >= 19:
|
|
print("add to night ****")
|
|
self.night.append(item)
|
|
|
|
|
|
self.sort_array()
|
|
return self.data
|
|
|
|
class cls_caldav_client():
|
|
caldav_url = "https://sync.infomaniak.com"
|
|
#caldav_user = 'SC01066' #ambulanciers@ambulance-clerc.ch
|
|
#caldav_password = "mc144*1870CLERC"
|
|
|
|
caldav_user = 'AA01593' #agenda@ambulance-clerc.ch
|
|
caldav_password = "Agendamc144"
|
|
|
|
caldav_agenda_config_url = caldav_cfg["task_config"]
|
|
|
|
|
|
events = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_caldav(self, caldav_url, caldav_user, caldav_password):
|
|
client = caldav.DAVClient(url=caldav_url, username=caldav_user, password=caldav_password)
|
|
self.data = self.client.principal()
|
|
|
|
for cal in self.data.calendars():
|
|
print(f"name: {cal.name}({cal.url})")
|
|
|
|
self.a_task = self.data.calendar(cal_id=caldav_id["task"])
|
|
self.a_team = self.data.calendar(cal_id=caldav_id["team"])
|
|
self.a_vhc = self.data.calendar(cal_id=caldav_id["vhc"])
|
|
self.a_op = self.data.calendar(cal_id=caldav_id["op"])
|
|
self.a_task_config = self.data.calendar(cal_id=caldav_id["task_config"])
|
|
self.a_road = self.data.calendar(cal_id=caldav_id["road"])
|
|
self.a_rh = self.data.calendar(cal_id=caldav_id["rh"])
|
|
self.a_manif = self.data.calendar(cal_id=caldav_id["manif"])
|
|
|
|
def get_event_by_uuid(self,calandar,uuid):
|
|
event = calandar.object_by_uid(uid=uuid)
|
|
return event
|
|
#self.get_events_by_date(calandar)
|
|
#for event in self.events:
|
|
|
|
# print(event.vobject_instance.vevent.eventId)
|
|
def mark_as_done_task(self,calandar,uuid):
|
|
event = self.get_event_by_uuid(calandar=calandar,uuid=uuid)
|
|
|
|
try:
|
|
_desc = event.vobject_instance.vevent.description.value + "#Fait"
|
|
_desc.replace("#Fait#Fait","#Fait")
|
|
except AttributeError:
|
|
_desc = "#Fait"
|
|
|
|
my_event = calandar.save_event(
|
|
dtstart=event.vobject_instance.vevent.dtstart.value,
|
|
dtend=event.vobject_instance.vevent.dtend.value,
|
|
summary=event.vobject_instance.vevent.summary.value,
|
|
description=_desc,
|
|
)
|
|
event.delete()
|
|
|
|
|
|
def get_events_by_date(self,calandar,date=None ):
|
|
if date is None:
|
|
date = datetime.now()
|
|
|
|
tomorow = date + timedelta(days=7)
|
|
self.events = calandar.date_search(
|
|
start=datetime(date.year, date.month, date.day), end=datetime(tomorow.year, tomorow.month, tomorow.day), expand=False
|
|
)
|
|
|
|
for event in self.events:
|
|
self.format_data_event(event)
|
|
return
|
|
|
|
def get_lastName_with_MC(self,search):
|
|
print(f"search équipage 1: {search}")
|
|
user = User.objects.get(username=f"mc{search}@clerc.ch")
|
|
last_name = f"{user.last_name} {user.first_name[0]}."
|
|
return last_name
|
|
|
|
def format_data_event(self,event):
|
|
summary = event.vobject_instance.vevent.summary.value
|
|
if Key_separator not in summary:
|
|
return False
|
|
key = summary.split(Key_separator)
|
|
if "&" not in key[1]:
|
|
return False
|
|
equipage = key[1].split("&")
|
|
team_1 = ""
|
|
team_2 = ""
|
|
updated = False
|
|
print(f"{event.vobject_instance.vevent.dtstart.value} check équipage({key[0]}) {key[1]}")
|
|
if "MC" not in equipage[0]: # check équipage 1
|
|
team_1 = equipage[0].strip().replace('#', '')
|
|
try:
|
|
last_name = self.get_lastName_with_MC(team_1)
|
|
equipage[0] = equipage[0].replace(team_1, f"MC{team_1} {last_name}")
|
|
updated = True
|
|
except :
|
|
print(f"Error: mc{team_1}@clerc.ch @{summary}")
|
|
return None
|
|
else:
|
|
print("Already with MC")
|
|
if len(equipage) < 2:
|
|
print(f"Error équipage sans [&] {key[1]}")
|
|
elif "MC" not in equipage[1]: # check équipage 2
|
|
team_2 = equipage[1].strip().replace('#','')
|
|
try:
|
|
last_name = self.get_lastName_with_MC(team_2)
|
|
equipage[1] = equipage[1].replace(team_2, f"MC{team_2} {last_name}")
|
|
updated = True
|
|
except :
|
|
print(f"Error: mc{team_2}@clerc.ch @{summary}")
|
|
return None
|
|
else:
|
|
print("Already with MC")
|
|
|
|
if updated:
|
|
event.vobject_instance.vevent.summary.value = f"{key[0]} {Key_separator} {equipage[0].strip()} & {equipage[1].strip()}"
|
|
event.save()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _caldav_item():
|
|
def __init__(self, key="", name="", desc="", dtstart=None, str_start_date="",
|
|
str_start_time="", dtend=None, str_end_date="", str_end_time="",
|
|
dtstamp=None, done=False, href="", uuid="", team_1="",
|
|
team_2="", team_1_chef=False, team_2_chef=False,
|
|
team_transfert=False, team_manif=False):
|
|
self.key = key
|
|
self.name = name
|
|
self.desc = desc
|
|
self.dtstart = dtstart
|
|
self.str_start_date = str_start_date
|
|
self.str_start_time = str_start_time
|
|
self.dtend = dtend
|
|
self.str_end_date = str_end_date
|
|
self.str_end_time = str_end_time
|
|
self.dtstamp = dtstamp
|
|
self.done = done
|
|
self.href = href
|
|
self.uuid = uuid
|
|
self.team_1 = team_1
|
|
self.team_2 = team_2
|
|
self.team_1_chef = team_1_chef
|
|
self.team_2_chef = team_2_chef
|
|
self.team_transfert = team_transfert
|
|
self.team_manif = team_manif
|
|
|
|
def format_str_date(self):
|
|
self.str_start_date = datetime.strptime(self.dtstart,"%d.%m.%Y %H:%M").strftime("%d.%m")
|
|
self.str_start_time = datetime.strptime(self.dtstart, "%d.%m.%Y %H:%M").strftime("%H:%M")
|
|
self.str_end_date = datetime.strptime(self.dtend,"%d.%m.%Y %H:%M").strftime("%d.%m")
|
|
self.str_end_time = datetime.strptime(self.dtend, "%d.%m.%Y %H:%M").strftime("%H:%M")
|
|
|
|
|
|
|
|
|
|
from rest_framework import serializers
|
|
|
|
class CalDavItemSerializer(serializers.Serializer):
|
|
key = serializers.CharField()
|
|
name = serializers.CharField()
|
|
desc = serializers.CharField(allow_blank=True)
|
|
dtstart = serializers.CharField()
|
|
str_start_date = serializers.CharField()
|
|
str_start_time = serializers.CharField()
|
|
dtend = serializers.CharField()
|
|
str_end_date = serializers.CharField()
|
|
str_end_time = serializers.CharField()
|
|
done = serializers.BooleanField()
|
|
href = serializers.CharField(allow_blank=True)
|
|
uuid = serializers.CharField()
|
|
team_1 = serializers.CharField(allow_blank=True)
|
|
team_2 = serializers.CharField(allow_blank=True)
|
|
team_1_chef = serializers.BooleanField()
|
|
team_2_chef = serializers.BooleanField()
|
|
team_transfert = serializers.BooleanField()
|
|
team_manif = serializers.BooleanField() |