343 lines
11 KiB
Python
343 lines
11 KiB
Python
from datetime import datetime, timedelta, time
|
|
from django.contrib.auth.models import User
|
|
import datetime as Datetime
|
|
import pickle
|
|
import os
|
|
from dateutil.relativedelta import relativedelta
|
|
|
|
|
|
|
|
from django.db import models
|
|
import urllib.request
|
|
import ssl
|
|
|
|
from icalendar import Calendar, Event
|
|
import recurring_ical_events
|
|
import caldav
|
|
|
|
from mycaldav.settings import *
|
|
|
|
# Create your models here.
|
|
class cls_caldav():
|
|
url = ""
|
|
data = None
|
|
items = []
|
|
day = []
|
|
night = []
|
|
|
|
def __init__(self, url=""):
|
|
self.url = url
|
|
|
|
def clear_data(self):
|
|
self.data = None
|
|
self.items = []
|
|
self.day = []
|
|
self.night = []
|
|
|
|
#Trie les tableau par odre croissant sur la date de début de l'événement
|
|
def sort_array(self):
|
|
self.items.sort(key=lambda x: x.dtstamp, reverse=False)
|
|
|
|
def sort_array_by_key(self,reverse=False):
|
|
self.items.sort(key=lambda x: x.key, reverse=reverse)
|
|
self.day.sort(key=lambda x: x.key, reverse=reverse)
|
|
self.night.sort(key=lambda x: x.key, reverse=reverse)
|
|
|
|
|
|
def get_caldav_data(self,periode=1,calendar=None, date=None):
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
self.clear_data()
|
|
|
|
o_url = urllib.request.urlopen(self.url, context=ctx)
|
|
self.data = o_url.read()
|
|
o_url.close()
|
|
|
|
if date is None:
|
|
date = datetime.now()
|
|
|
|
today = (date.year,date.month,date.day)
|
|
|
|
|
|
events = None
|
|
if periode == 1:
|
|
events = recurring_ical_events.of(Calendar.from_ical(self.data)).at(today)
|
|
elif periode == 2:
|
|
tomorow = date + timedelta(days=1)
|
|
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,tomorow)
|
|
elif periode == 7:
|
|
endweek = date + timedelta(days=6)
|
|
events = recurring_ical_events.of(Calendar.from_ical(self.data)).between(today,endweek)
|
|
|
|
self.parse_data(events)
|
|
def parse_data(self, events):
|
|
for event in events:
|
|
item = _caldav_item()
|
|
item.name = event["SUMMARY"]
|
|
item.uiid = event["UID"]
|
|
if "-" in item.name:
|
|
arr = item.name.split("-")
|
|
item.key = arr[0]
|
|
if "$" in item.key:
|
|
item.team_transfert = True
|
|
item.key = item.key.replace("$","")
|
|
if "Manif" in item.key:
|
|
item.team_manif = True
|
|
item.key = item.key.replace("Manif", "")
|
|
item.name = arr[1]
|
|
if "&" in item.name:
|
|
arr = item.name.split("&")
|
|
item.team_1 = arr[0]
|
|
if "#" in item.team_1:
|
|
item.team_1_chef = True
|
|
item.team_1 = item.team_1.replace("#","")
|
|
item.team_2 = arr[1]
|
|
if "#" in item.team_2:
|
|
item.team_2_chef = True
|
|
item.team_2 = item.team_2.replace("#", "")
|
|
|
|
if "DESCRIPTION" in event.keys():
|
|
item.desc = event["DESCRIPTION"]
|
|
if "#" in item.desc:
|
|
item.done = True
|
|
if '{href=' in item.desc:
|
|
temp_str = item.desc.split('{href=')[1]
|
|
temp_str = temp_str.split('}')[0]
|
|
item.href = temp_str
|
|
item.desc = item.desc.replace("{href=" + item.href + "}","")
|
|
|
|
item.dtstart = event["DTSTART"].dt.strftime("%d.%m.%Y %H:%M")
|
|
item.dtstamp = int(event["DTSTART"].dt.strftime("%Y%m%d%H%M"))
|
|
print(item.dtstamp)
|
|
item.dtend = event["DTEND"].dt.strftime("%d.%m.%Y %H:%M")
|
|
|
|
|
|
item.format_str_date()
|
|
self.items.append(item)
|
|
|
|
|
|
start = datetime.strptime(item.dtstart,"%d.%m.%Y %H:%M")
|
|
|
|
if type(start) is Datetime.date:
|
|
start = datetime.combine(start, datetime.min.time())
|
|
print("convert Date to datetime")
|
|
print(start)
|
|
if int(start.strftime("%H")) < 19:
|
|
self.day.append(item)
|
|
if int(start.strftime("%H")) >= 19:
|
|
self.night.append(item)
|
|
|
|
|
|
self.sort_array()
|
|
return self.data
|
|
|
|
class cls_caldav_client():
|
|
caldav_url = "https://sync.infomaniak.com"
|
|
#caldav_user = 'SC01066' #ambulanciers@ambulance-clerc.ch
|
|
#caldav_password = "mc144*1870CLERC"
|
|
|
|
caldav_user = 'AA01593' #agenda@ambulance-clerc.ch
|
|
caldav_password = "Agendamc144"
|
|
|
|
caldav_agenda_config_url = caldav_cfg["task_config"]
|
|
|
|
events = None
|
|
|
|
|
|
|
|
def __init__(self):
|
|
self.client = caldav.DAVClient(url=self.caldav_url, username=self.caldav_user, password=self.caldav_password)
|
|
self.data = self.client.principal()
|
|
|
|
for cal in self.data.calendars():
|
|
print(f"name: {cal.name}({cal.url})")
|
|
|
|
self.a_task = self.data.calendar(cal_id=caldav_id["task"])
|
|
self.a_team = self.data.calendar(cal_id=caldav_id["team"])
|
|
self.a_vhc = self.data.calendar(cal_id=caldav_id["vhc"])
|
|
self.a_op = self.data.calendar(cal_id=caldav_id["op"])
|
|
|
|
#self.get_events_by_date(self.a_team)
|
|
def get_event_by_uuid(self,calandar,uuid):
|
|
event = calandar.object_by_uid(uid=uuid)
|
|
return event
|
|
#self.get_events_by_date(calandar)
|
|
#for event in self.events:
|
|
|
|
# print(event.vobject_instance.vevent.eventId)
|
|
def mark_as_done_task(self,calandar,uuid):
|
|
event = self.get_event_by_uuid(calandar=calandar,uuid=uuid)
|
|
event.vobject_instance.vevent.description.value = event.vobject_instance.vevent.description.value + " #Fait"
|
|
event.save()
|
|
|
|
|
|
|
|
def init_task_management(self):
|
|
print("pass copy task process")
|
|
try:
|
|
with open(os.path.join("mycaldav", "last_sync_config.bin"), "rb") as file:
|
|
array = pickle.load(file)
|
|
except:
|
|
array = {"year": 0, "month": 0}
|
|
print("Erreur lecture fichier débiteur")
|
|
|
|
if array["month"] == datetime.now().month:
|
|
if array["year"] == datetime.now().year:
|
|
print("pas de copy, sync déjà fait")
|
|
else:
|
|
self.copy_caldav_data(self.a_task)
|
|
else:
|
|
self.copy_caldav_data(self.a_task)
|
|
self.clear_all_events_by_Date(self.a_task)
|
|
|
|
|
|
def add_event_in_calandar(self, calandar):
|
|
my_event = calandar.save_event(
|
|
dtstart=datetime(2022, 6, 30, 12),
|
|
dtend=datetime(2022, 6, 30, 13),
|
|
summary="Do the needful",
|
|
|
|
)
|
|
def get_events_by_date(self,calandar,date=None ):
|
|
if date is None:
|
|
date = datetime.now()
|
|
|
|
tomorow = date + timedelta(days=7)
|
|
self.events = calandar.date_search(
|
|
start=datetime(date.year, date.month, date.day), end=datetime(tomorow.year, tomorow.month, tomorow.day), expand=False
|
|
)
|
|
|
|
for event in self.events:
|
|
self.format_data_event(event)
|
|
return
|
|
|
|
def format_data_event(self,event):
|
|
summary = event.vobject_instance.vevent.summary.value
|
|
if "-" not in summary:
|
|
return False
|
|
key = summary.split("-")
|
|
if "&" not in key[1]:
|
|
return False
|
|
equipage = key[1].split("&")
|
|
team_1 = ""
|
|
team_2 = ""
|
|
updated = False
|
|
print(f"{event.vobject_instance.vevent.dtstart.value} check équipage({key[0]}) {key[1]}")
|
|
if "MC" not in equipage[0]: # check équipage 1
|
|
team_1 = equipage[0].strip().replace('#', '')
|
|
try:
|
|
print(f"search équipage 1: {team_1}")
|
|
user = User.objects.get(username=f"mc{team_1}@clerc.ch").last_name
|
|
equipage[0] = equipage[0].replace(team_1, f"MC{team_1} {user}")
|
|
updated = True
|
|
except :
|
|
print(f"Error: mc{team_1}@clerc.ch @{summary}")
|
|
return None
|
|
else:
|
|
print("Already with MC")
|
|
if len(equipage) < 2:
|
|
print(f"Error équipage sans [&] {key[1]}")
|
|
elif "MC" not in equipage[1]: # check équipage 2
|
|
team_2 = equipage[1].strip().replace('#','')
|
|
try:
|
|
print(f"search équipage 2: {team_2}")
|
|
user = User.objects.get(username=f"mc{team_2}@clerc.ch").last_name
|
|
equipage[1] = equipage[1].replace(team_2, f"MC{team_2} {user}")
|
|
updated = True
|
|
except :
|
|
print(f"Error: mc{team_2}@clerc.ch @{summary}")
|
|
return None
|
|
else:
|
|
print("Already with MC")
|
|
|
|
if updated:
|
|
event.vobject_instance.vevent.summary.value = f"{key[0]} - {equipage[0].strip()} & {equipage[1].strip()}"
|
|
event.save()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def copy_caldav_data(self, calandar=None):
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
|
|
|
|
o_url = urllib.request.urlopen(self.caldav_agenda_config_url, context=ctx)
|
|
data = o_url.read()
|
|
o_url.close()
|
|
|
|
now = f"01.{datetime.now().month}.{datetime.now().year}"
|
|
|
|
|
|
next_month = datetime.strptime(f"1.{datetime.today().month}.{datetime.today().year}","%d.%m.%Y") + relativedelta(months=1) - relativedelta(days=1)
|
|
next_month = (next_month.year, next_month.month, next_month.day)
|
|
|
|
today = (datetime.now().year, datetime.now().month, 1)
|
|
print(f"today:{today}")
|
|
print(f"next:{next_month}")
|
|
events = None
|
|
events = recurring_ical_events.of(Calendar.from_ical(data)).between(today, next_month)
|
|
cur_events = calandar.date_search(start=datetime(today[0],today[1],today[2]), end=datetime(next_month[0],next_month[1],next_month[2]), expand=True)
|
|
|
|
for event in events:
|
|
print(f"copy de l'événement: {event['SUMMARY']}")
|
|
|
|
|
|
my_event = calandar.save_event(
|
|
dtstart=event["DTSTART"].dt,
|
|
dtend=event["DTEND"].dt,
|
|
summary= event["SUMMARY"],
|
|
description=event["DESCRIPTION"],
|
|
)
|
|
|
|
with open(os.path.join("mycaldav", "last_sync_config.bin"), "wb") as file:
|
|
array = {"year":datetime.now().year, "month":datetime.now().month}
|
|
pickle.dump(array, file)
|
|
|
|
|
|
|
|
|
|
def clear_all_events_by_Date(self, calandar):
|
|
events = calandar.date_search(start=datetime(2000, 1, 1), end=datetime.now() - relativedelta(years=1), expand=True)
|
|
for e in events:
|
|
e.delete()
|
|
#print(events)
|
|
|
|
|
|
class _caldav_item():
|
|
key = ""
|
|
name = ""
|
|
desc = ""
|
|
dtstart = None
|
|
str_start_date = ""
|
|
str_start_time = ""
|
|
dtend = None
|
|
str_end_date = ""
|
|
str_end_time = ""
|
|
dtstamp = None
|
|
done = False
|
|
href = ""
|
|
uiid = ""
|
|
team_1 = ""
|
|
team_2 = ""
|
|
team_1_chef = False
|
|
team_2_chef = False
|
|
team_transfert = False
|
|
team_manif = False
|
|
|
|
def format_str_date(self):
|
|
self.str_start_date = datetime.strptime(self.dtstart,"%d.%m.%Y %H:%M").strftime("%d.%m")
|
|
self.str_start_time = datetime.strptime(self.dtstart, "%d.%m.%Y %H:%M").strftime("%H:%M")
|
|
self.str_end_date = datetime.strptime(self.dtend,"%d.%m.%Y %H:%M").strftime("%d.%m")
|
|
self.str_end_time = datetime.strptime(self.dtend, "%d.%m.%Y %H:%M").strftime("%H:%M")
|
|
|
|
|
|
|
|
|