Пишем будильник на питоне | Блог python программиста
Изображение гика

Блог питониста

Пишем будильник на питоне

26 октября 2017 г.

Хочется написать себе будильник, который стартовал бы в определенное время, проигрывал бы несколько композиций и выключался бы до следующего раза. Это возможно сделать, используя, например, библиотеку gstreamer. 

Я буду писать его под Ubuntu, поскольку использую данную ОС дома. Также можно попробовать сделать то же самое под Raspbian, если у вас есть Raspberry. Будильник будет работать как демон, тоесть не будет блокировать терминал,  а будет работать в фоновом режиме.

Установка библиотек


Установить небходимые библиотеки на Debian - системах можно так:

apt-get install python-gst0.10 gstreamer0.10-plugins-good gstreamer0.10-plugins-ugly

Чтобы проверить, что все работает корректно, откройте интерпретатор python и сделайте такие импорты: 

1import gst
2import gobject

Также можно попробовать проиграть какой-нибудь аудиофайл:

gst-launch-0.10 filesrc location=/path/to/file/test.mp3 ! decodebin ! audioconvert ! autoaudiosink

Для демонизации процесса понадобится библиотека daemonize, установить ее можно так:

pip install daemonize

Пишем будильник


Простейший демон с помощью данной библиотеки:

 1from time import sleep
 2from daemonize import Daemonize
 3
 4pid = "/tmp/test.pid"
 5
 6
 7def main():
 8    while True:
 9        sleep(5)
10
11
12daemon = Daemonize(app="test_app", pid=pid, action=main)
13daemon.start()

Полезная команда, которая вам может пригодится, если вы запустили этот тестовый демон, или если вам нужно точно убить какой-то процесс в системе, сколько бы экземпляров данного процесса ни было:

1sudo kill -9 `ps -ef | grep python | grep -v grep | awk '{print $2}'`

Разберем по частям, следующая команда выведет список процессов:

1ps -ef

Дальше отберем только python процессы:

1ps -ef | grep python

Затем удалим из полученного списка саму нашу команду

1ps -ef | grep python | grep -v grep

И, наконец, отберем PID'ы процессов и убьем их:

1sudo kill -9 `ps -ef | grep python | grep -v grep | awk '{print $2}'`

Gstreamer - это библиотека основанная на плагинах, один такой плагин - playbin мы будем использовать. Данные в gstreamer проходят через pipeline. Нам придется использовать элемент playbin для построения pipeline. Также будет задействована библиоткека gobject. Простейший аудиоплеер с помощью gstreamer:

 1import gst
 2import gobject
 3import threading
 4
 5class AlarmClock():
 6
 7    def __init__(self, musiclist):
 8        self.musiclist = musiclist
 9        self.song_num = 0
10        self.construct_pipeline()
11        self.set_property_file()
12
13    def construct_pipeline(self):
14        self.player = gst.element_factory_make("playbin")
15        self.is_playing = False
16        self.connect_signals()
17
18    def connect_signals(self):
19        # In this case, we only capture the messages
20        # put on the bus.
21        bus = self.player.get_bus()
22        bus.add_signal_watch()
23        bus.connect("message", self.message_handler)
24
25    def play(self):
26        self.is_playing = True
27        self.player.set_state(gst.STATE_PLAYING)
28
29    def handle_error(self, message):
30        print(message)
31
32    def set_property_file(self):
33        self.player.set_property(
34            "uri",
35            "file://"+self.musiclist[self.song_num])
36
37    def stop(self):
38        self.player.set_state(gst.STATE_NULL)
39        self.is_playing = False
40
41    def message_handler(self, bus, message):
42        # Capture the messages on the bus and
43        # set the appropriate flag
44        msg_type = message.type
45        if msg_type == gst.MESSAGE_ERROR:
46            self.handle_error(message)
47        elif msg_type == gst.MESSAGE_EOS:
48            print("End Of Song")
49            if self.song_num < len(self.musiclist)-1:
50                self.song_num += 1
51                self.stop()
52                self.set_property_file()
53                self.play()
54            else:
55                self.stop()
56
57
58class GobInit(threading.Thread):
59    def __init__(self):
60        threading.Thread.__init__(self)
61
62    def run(self):
63        gobject.threads_init()
64        self.loop = gobject.MainLoop()
65        self.loop.run()
66
67
68PATH_TO_SONG = '/home/Downloads/test.mp3'
69PATH_TO_SONG2 = '/home/Downloads/test2.mp3'
70
71
72def main():
73    gob = GobInit()
74    gob.start()
75    print('start')
76    player = AlarmClock([PATH_TO_SONG, PATH_TO_SONG2])
77    print('player created')
78    player.play()
79
80
81main()

При конце каждой песни на шину приходит сообщение MESSAGE_EOS, пользуясь этим, можно переключать песни, что и реализовано на строках 50-53.

Если добавить в этот плеер daemonize и логирование в файл /tmp/test.log:

 1import gst
 2import gobject
 3import threading
 4import logging
 5import time
 6from daemonize import Daemonize
 7
 8
 9pid = "/tmp/test.pid"
10logger = logging.getLogger(__name__)
11logger.setLevel(logging.DEBUG)
12logger.propagate = False
13fh = logging.FileHandler("/tmp/test.log", "a")
14fh.setLevel(logging.DEBUG)
15logger.addHandler(fh)
16keep_fds = [fh.stream.fileno()]
17
18
19class AlarmClock():
20
21    def __init__(self, musiclist):
22        self.musiclist = musiclist
23        self.song_num = 0
24        self.construct_pipeline()
25        self.set_property_file()
26
27    def construct_pipeline(self):
28        self.player = gst.element_factory_make("playbin")
29        self.is_playing = False
30        self.connect_signals()
31
32    def connect_signals(self):
33        # In this case, we only capture the messages
34        # put on the bus.
35        bus = self.player.get_bus()
36        bus.add_signal_watch()
37        bus.connect("message", self.message_handler)
38
39    def play(self):
40        self.is_playing = True
41        self.player.set_state(gst.STATE_PLAYING)
42
43    def handle_error(self, message):
44        logger.debug(message)
45
46    def set_property_file(self):
47        self.player.set_property(
48            "uri",
49            "file://"+self.musiclist[self.song_num])
50
51    def stop(self):
52        self.player.set_state(gst.STATE_NULL)
53        self.is_playing = False
54        logger.debug("player stopped")
55
56    def message_handler(self, bus, message):
57        # Capture the messages on the bus and
58        # set the appropriate flag
59        msg_type = message.type
60        if msg_type == gst.MESSAGE_ERROR:
61            self.handle_error(message)
62        elif msg_type == gst.MESSAGE_EOS:
63            logger.debug("End Of Song")
64            if self.song_num < len(self.musiclist)-1:
65                self.song_num += 1
66                self.stop()
67                self.set_property_file()
68                self.play()
69            else:
70                self.stop()
71
72
73class GobInit(threading.Thread):
74    def __init__(self):
75        threading.Thread.__init__(self)
76
77    def run(self):
78        gobject.threads_init()
79        self.loop = gobject.MainLoop()
80        self.loop.run()
81
82
83PATH_TO_SONG = '/home/Downloads/test.mp3'
84PATH_TO_SONG2 = '/home/Downloads/test2.mp3'
85
86
87def main():
88    logger.debug("start")
89    gob = GobInit()
90    gob.start()
91    player = AlarmClock([PATH_TO_SONG, PATH_TO_SONG2])
92    logger.debug("player created")
93    player.play()
94
95
96daemon = Daemonize(app="test_app", pid=pid, action=main, keep_fds=keep_fds)
97daemon.start()

Теперь вы можете запустить данный плеер:

python alarm_clock.py

А в другом терминале мониторить лог:

tail -f /tmp/test.log

Также, вам может понадобится что-то отдебажить или принтануть, можете тогда перевести в блокирующий режим, задав foreground=True:

1daemon = Daemonize(app="test_app", pid=pid, action=main, keep_fds=keep_fds,
2                   foreground=True)

Далее, хочется сделать так, чтобы после одного проигрывания снова устанавливался первый трек, для этого необходимо так изменить функцию message_handler:

 1def message_handler(self, bus, message):
 2    msg_type = message.type
 3    if msg_type == gst.MESSAGE_ERROR:
 4        self.handle_error(message)
 5    elif msg_type == gst.MESSAGE_EOS:
 6        logger.debug("End Of Song")
 7        if self.song_num < len(self.musiclist)-1:
 8            self.song_num += 1
 9            self.stop()
10            self.set_property_file()
11            self.play()
12        else:
13            self.stop()
14            self.song_num = 0
15            self.set_property_file()

Я добавил строки 14 и 15 - установку первого трека после того, как все треки проиграны, чтобы на следующий день будильник начал играть с первого трека, также необходимо поменять функцию main и добавить функцию sleep_till_next_play:

 1from datetime import datetime, timedelta
 2...
 3
 4
 5def sleep_till_next_play(
 6        current_datetime, time_weekdays, time_weekend, logger):
 7    next_datetime = current_datetime + timedelta(days=1)
 8    next_str_date = next_datetime.strftime('%d/%m/%Y')
 9    if next_datetime.weekday() not in (5, 6):
10        next_datetime_play = datetime.strptime(
11            next_str_date + " " + time_weekdays,  "%d/%m/%Y %H:%M")
12    else:
13        next_datetime_play = datetime.strptime(
14            next_str_date + " " + time_weekend,  "%d/%m/%Y %H:%M")
15    delta = next_datetime_play - current_datetime
16    sleep_sec = delta.total_seconds()
17    logger.debug("sleep till next start for {0} sec".format(sleep_sec))
18    time.sleep(sleep_sec)
19
20
21def main():
22    logger.debug("start")
23    gob = GobInit()
24    gob.start()
25    player = AlarmClock([PATH_TO_SONG, PATH_TO_SONG2])
26    logger.debug("player created")
27
28    time_weekdays = "8:00"
29    time_weekend = "9:00"
30    current_datetime = datetime.now()
31    current_str_date = current_datetime.strftime('%d/%m/%Y')
32    if current_datetime.weekday() not in (5, 6):
33        datetime_play = datetime.strptime(
34            current_str_date + " " + time_weekdays,  "%d/%m/%Y %H:%M")
35    else:
36        datetime_play = datetime.strptime(
37            current_str_date + " " + time_weekend,  "%d/%m/%Y %H:%M")
38    delta = datetime_play - current_datetime
39    if delta.total_seconds() < 0:
40        sleep_till_next_play(
41            current_datetime, time_weekdays, time_weekend, logger)
42    else:
43        sleep_sec = delta.total_seconds()
44        logger.debug("sleep till start for {0} sec".format(sleep_sec))
45        time.sleep(sleep_sec)
46
47    while True:
48        player.play()
49        sleep_till_next_play(
50            datetime.now(), time_weekdays, time_weekend, logger)

Функция weekday() возвращает числа от 0 до 6, которые соответствуют дням с понедельника по воскресенье. Сначала определяется текущее время (с. 30), если оно больше заданного, то демон спит до следующего дня (с. 40), а если меньше, то до заданного времени (стр. 45). Затем запускается бесконечный цикл, в котором будильник каждый раз играет и отправляется спать до следующего дня.

Теперь добавим возможность остановить будильник и узнать статус (запущен или нет), для этого используем модуль argparse:

 1...
 2import argparse
 3import sys
 4import os
 5import signal
 6
 7...
 8def main():
 9    ...
10
11def kill(pid_f, logger):
12    if os.path.isfile(pid_f):
13        with open(pid_f) as pid_file:
14            pid = pid_file.read()
15            try:
16                os.kill(int(pid), signal.SIGKILL)
17            except (OSError, ValueError) as e:
18                logger.debug(
19                    'Process is not killed due to: {0}'.format(e))
20            else:
21                logger.debug('Stopped')
22                os.remove(pid_f)
23    else:
24        logger.debug(
25            'There is no pid_file, nothing to kill')
26
27
28parser = argparse.ArgumentParser()
29mutually_exclusive_group = parser.add_mutually_exclusive_group(
30        required=True)
31
32mutually_exclusive_group.add_argument(
33    "-start", action="store_true")
34mutually_exclusive_group.add_argument(
35    "-stop", action="store_true")
36mutually_exclusive_group.add_argument(
37    "-status", action="store_true")
38args = vars(parser.parse_args())
39
40if args.get('stop'):
41    kill(PID, logger)
42    sys.exit()
43
44elif args.get('status'):
45    try:
46        with open(PID) as pid_file:
47            pid = pid_file.read()
48        os.kill(int(pid), 0)
49    except Exception as e:
50        logger.debug("Process is stopped")
51    else:
52        logger.debug("Process is running")
53    sys.exit()
54
55kill(PID, logger)
56
57daemon = Daemonize(app="test_app", pid=PID, action=main, keep_fds=keep_fds)
58daemon.start()

Теперь проверить статус демона, убить его или запустить можно с помощью аргументов status, stop, start. Для корректной работы нужно передать хотя бы один аргумент:

python alarm_clock.py -status

python alarm_clock.py -stop

python alarm_clock.py -start

После выполнения любой из этих команд смотрите на то, что залогировалось (у меня лог задан "/tmp/test.log", вы можете поместить его куда хотите). Я добавил функцию kill, для того, чтобы нельзя было включить несколько процессов одновременно. После небольшого рефакторинга получилось:

  1# -*- coding: utf-8 -*-
  2import gst
  3import gobject
  4import threading
  5import logging
  6import time
  7from datetime import datetime, timedelta
  8from daemonize import Daemonize
  9import argparse
 10import sys
 11import os
 12import signal
 13
 14PID = "/tmp/test.pid"
 15# настройки логера
 16logger = logging.getLogger(__name__)
 17logger.setLevel(logging.DEBUG)
 18logger.propagate = False
 19fh = logging.FileHandler("/tmp/test.log", "a")
 20fh.setLevel(logging.DEBUG)
 21logger.addHandler(fh)
 22keep_fds = [fh.stream.fileno()]
 23
 24# настройки в какое время играть
 25time_weekdays = "8:30"
 26time_weekend = "9:00"
 27
 28# пути до песен
 29basic_path = '/home/Downloads/'
 30path_to_song = basic_path + 'test.mp3'
 31path_to_song_2 = basic_path + 'test2.mp3'
 32
 33
 34class AlarmClock():
 35
 36    def __init__(self, musiclist):
 37        self.musiclist = musiclist
 38        self.song_num = 0
 39        self.construct_pipeline()
 40        self.set_property_file()
 41
 42    def construct_pipeline(self):
 43        self.player = gst.element_factory_make("playbin")
 44        self.is_playing = False
 45        self.connect_signals()
 46
 47    def connect_signals(self):
 48        bus = self.player.get_bus()
 49        bus.add_signal_watch()
 50        bus.connect("message", self.message_handler)
 51
 52    def play(self):
 53        self.is_playing = True
 54        self.player.set_state(gst.STATE_PLAYING)
 55        logger.debug("player set to play")
 56
 57    def handle_error(self, message):
 58        logger.debug(message)
 59
 60    def set_property_file(self):
 61        self.player.set_property(
 62            "uri",
 63            "file://"+self.musiclist[self.song_num])
 64
 65    def stop(self):
 66        self.player.set_state(gst.STATE_NULL)
 67        self.is_playing = False
 68        logger.debug("player stopped")
 69
 70    def message_handler(self, bus, message):
 71        msg_type = message.type
 72        if msg_type == gst.MESSAGE_ERROR:
 73            self.handle_error(message)
 74        elif msg_type == gst.MESSAGE_EOS:
 75            logger.debug("End Of Song")
 76            if self.song_num < len(self.musiclist)-1:
 77                self.song_num += 1
 78                self.stop()
 79                self.set_property_file()
 80                self.play()
 81            else:
 82                self.stop()
 83                self.song_num = 0
 84                self.set_property_file()
 85
 86
 87class GobInit(threading.Thread):
 88    def __init__(self):
 89        threading.Thread.__init__(self)
 90
 91    def run(self):
 92        gobject.threads_init()
 93        self.loop = gobject.MainLoop()
 94        self.loop.run()
 95
 96
 97def get_delta(datetime_obj, str_date, time_weekdays, time_weekend,
 98              current_datetime_obj=None):
 99    """Функция для вычисления разности между двумя datetime"""
100
101    if datetime_obj.weekday() not in (5, 6):
102        datetime_play = datetime.strptime(
103            str_date + " " + time_weekdays,  "%d/%m/%Y %H:%M")
104    else:
105        datetime_play = datetime.strptime(
106            str_date + " " + time_weekend,  "%d/%m/%Y %H:%M")
107    if current_datetime_obj:
108        delta = datetime_play - current_datetime_obj
109    else:
110        delta = datetime_play - datetime_obj
111    return delta
112
113
114def sleep_till_next_play(
115        current_datetime, time_weekdays, time_weekend, logger):
116    next_datetime = current_datetime + timedelta(days=1)
117    next_str_date = next_datetime.strftime('%d/%m/%Y')
118    delta = get_delta(
119        next_datetime, next_str_date, time_weekdays, time_weekend,
120        current_datetime)
121    sleep_sec = delta.total_seconds()
122    logger.debug("sleep till next start for {0} sec".format(sleep_sec))
123    time.sleep(sleep_sec)
124
125
126def main():
127    logger.debug("start")
128    gob = GobInit()
129    gob.start()
130    player = AlarmClock([path_to_song, path_to_song_2])
131    logger.debug("player created")
132
133    current_datetime = datetime.now()
134    current_str_date = current_datetime.strftime('%d/%m/%Y')
135    delta = get_delta(
136        current_datetime, current_str_date, time_weekdays, time_weekend)
137    # время для проигрывания треков уже прошло, спим до следующего раза:
138    if delta.total_seconds() < 0:
139        sleep_till_next_play(
140            current_datetime, time_weekdays, time_weekend, logger)
141    # спим до времени, в которое нужно проигрывать треки:
142    else:
143        sleep_sec = delta.total_seconds()
144        logger.debug("sleep till start for {0} sec".format(sleep_sec))
145        time.sleep(sleep_sec)
146
147    while True:
148        player.play()
149        # спим до следующего раза
150        sleep_till_next_play(
151            datetime.now(), time_weekdays, time_weekend, logger)
152
153
154def kill(pid_f, logger):
155    if os.path.isfile(pid_f):
156        with open(pid_f) as pid_file:
157            pid = pid_file.read()
158            try:
159                os.kill(int(pid), signal.SIGKILL)
160            except (OSError, ValueError) as e:
161                logger.debug(
162                    'Process is not killed due to: {0}'.format(e))
163            else:
164                logger.debug('Stopped')
165                os.remove(pid_f)
166    else:
167        logger.debug(
168            'There is no pid_file, nothing to kill')
169
170
171parser = argparse.ArgumentParser()
172mutually_exclusive_group = parser.add_mutually_exclusive_group(
173        required=True)
174
175mutually_exclusive_group.add_argument(
176    "-start", action="store_true")
177mutually_exclusive_group.add_argument(
178    "-stop", action="store_true")
179mutually_exclusive_group.add_argument(
180    "-status", action="store_true")
181args = vars(parser.parse_args())
182
183if args.get('stop'):
184    kill(PID, logger)
185    sys.exit()
186
187elif args.get('status'):
188    try:
189        with open(PID) as pid_file:
190            pid = pid_file.read()
191        os.kill(int(pid), 0)
192    except Exception as e:
193        logger.debug("Process is stopped")
194    else:
195        logger.debug("Process is running")
196    sys.exit()
197
198# в любом случае убиваю демон, чтобы не запустить дважды
199kill(PID, logger)
200
201daemon = Daemonize(app="test_app", pid=PID, action=main, keep_fds=keep_fds)
202daemon.start()

Я собрал все настройки в начале, добавил функцию get_delta, потому что я использовал один и тот же кусок кода дважды (строки 101-106). Соответственно, я вынес этот дублированный код в эту функцию, чтобы следовать принципу DRY, что является хорошим тоном в программировании.

И все вроде хорошо, и все работает, но потом я еще раз посмотрел на этот код:

 1def sleep_till_next_play(
 2        current_datetime, time_weekdays, time_weekend, logger):
 3    next_datetime = current_datetime + timedelta(days=1)
 4    next_str_date = next_datetime.strftime('%d/%m/%Y')
 5    delta = get_delta(
 6        next_datetime, next_str_date, time_weekdays, time_weekend,
 7        current_datetime)
 8    sleep_sec = delta.total_seconds()
 9    logger.debug("sleep till next start for {0} sec".format(sleep_sec))
10    time.sleep(sleep_sec)

И осознал, что я написал кучу говнокода, посмотрел как решают проблему нормальные люди, понял, что все можно сделать гораздо проще и решил переписать часть кода, убрав совсем функции sleep_till_next_play, get_delta, и подчистив main:

  1import gst
  2import gobject
  3import threading
  4import logging
  5from datetime import datetime
  6from daemonize import Daemonize
  7import argparse
  8import sys
  9import os
 10import signal
 11
 12PID = "/tmp/test.pid"
 13# logger's settings
 14log_file = "/tmp/test.log"
 15logger = logging.getLogger(__name__)
 16logger.setLevel(logging.DEBUG)
 17logger.propagate = False
 18fh = logging.FileHandler(log_file, "a")
 19fh.setLevel(logging.DEBUG)
 20logger.addHandler(fh)
 21keep_fds = [fh.stream.fileno()]
 22
 23# what time to play
 24time_weekdays = "21:05"
 25time_weekend = "9:00"
 26
 27# paths to songs
 28basic_path = '/home/austinnikov/Downloads/'
 29song_array = (basic_path+'test.mp3', basic_path+'test2.mp3')
 30
 31
 32class AlarmClock():
 33
 34    def __init__(self, musiclist):
 35        self.musiclist = musiclist
 36        self.song_num = 0
 37        self.construct_pipeline()
 38        self.set_property_file()
 39
 40    def construct_pipeline(self):
 41        self.player = gst.element_factory_make("playbin")
 42        self.is_playing = False
 43        self.connect_signals()
 44
 45    def connect_signals(self):
 46        bus = self.player.get_bus()
 47        bus.add_signal_watch()
 48        bus.connect("message", self.message_handler)
 49
 50    def play(self):
 51        self.is_playing = True
 52        self.player.set_state(gst.STATE_PLAYING)
 53
 54    def handle_error(self, message):
 55        logger.debug(message)
 56
 57    def set_property_file(self):
 58        self.player.set_property(
 59            "uri",
 60            "file://"+self.musiclist[self.song_num])
 61
 62    def stop(self):
 63        self.player.set_state(gst.STATE_NULL)
 64        self.is_playing = False
 65        logger.debug("player stopped")
 66
 67    def message_handler(self, bus, message):
 68        msg_type = message.type
 69        if msg_type == gst.MESSAGE_ERROR:
 70            self.handle_error(message)
 71        elif msg_type == gst.MESSAGE_EOS:
 72            logger.debug("End Of Song")
 73            if self.song_num < len(self.musiclist)-1:
 74                self.song_num += 1
 75                self.stop()
 76                self.set_property_file()
 77                self.play()
 78            else:
 79                self.stop()
 80                self.song_num = 0
 81                self.set_property_file()
 82
 83
 84class GobInit(threading.Thread):
 85    def __init__(self):
 86        threading.Thread.__init__(self)
 87
 88    def run(self):
 89        gobject.threads_init()
 90        self.loop = gobject.MainLoop()
 91        self.loop.run()
 92
 93
 94def main():
 95    logger.debug("start")
 96    gob = GobInit()
 97    gob.start()
 98    player = AlarmClock(song_array)
 99    logger.debug("player created")
100
101    hour_weekdays = int(time_weekdays.split(':')[0])
102    minute_weekdays = int(time_weekdays.split(':')[1])
103
104    hour_weekend = int(time_weekend.split(':')[0])
105    minute_weekend = int(time_weekend.split(':')[1])
106
107    while True:
108        now = datetime.now()
109        if any((
110                now.weekday() in (5, 6) and
111                (hour_weekend == now.hour and minute_weekend == now.minute),
112                now.weekday() not in (5, 6) and
113                (hour_weekdays == now.hour and minute_weekdays == now.minute)
114        )):
115            player.play()
116
117
118def kill(pid_f, logger):
119    if os.path.isfile(pid_f):
120        with open(pid_f) as pid_file:
121            pid = pid_file.read()
122            try:
123                os.kill(int(pid), signal.SIGKILL)
124            except (OSError, ValueError) as e:
125                logger.debug(
126                    'Process is not killed due to: {0}'.format(e))
127            else:
128                logger.debug('Stopped')
129                os.remove(pid_f)
130    else:
131        logger.debug(
132            'There is no pid_file, nothing to kill')
133
134
135parser = argparse.ArgumentParser()
136mutually_exclusive_group = parser.add_mutually_exclusive_group(
137        required=True)
138
139mutually_exclusive_group.add_argument(
140    "-start", action="store_true")
141mutually_exclusive_group.add_argument(
142    "-stop", action="store_true")
143mutually_exclusive_group.add_argument(
144    "-status", action="store_true")
145args = vars(parser.parse_args())
146
147if args.get('stop'):
148    kill(PID, logger)
149    sys.exit()
150
151elif args.get('status'):
152    try:
153        with open(PID) as pid_file:
154            pid = pid_file.read()
155        os.kill(int(pid), 0)
156    except Exception as e:
157        logger.debug("Process is stopped")
158    else:
159        logger.debug("Process is running")
160    sys.exit()
161
162# kill in order not to start several processes
163kill(PID, logger)
164
165daemon = Daemonize(app="test_app", pid=PID, action=main, keep_fds=keep_fds)
166daemon.start()

Выводы


Рефакторинг - хорошо! Сделан, правда, не с первой попытки, простой будильник, который работает как демон в фоновом режиме, управляется аргументами из командной строки и логирует свою работу. Правда, у него есть один недостаток: если вы заблокируете компьютер чем-то вроде xflock, то тайминги собьются, так что лучше просто убирать яркость монитора на ночь (если, конечно, вы не используете что-то типа raspberry pi). Код на github.

Метки

python
Если вам понравился пост, можете поделиться им в соцсетях: