Изображение гика

Блог питониста

Пишем будильник на питоне

26 октября 2017 г.

Хочется написать себе будильник, который стартовал бы в определенное время, проигрывал бы несколько композиций и выключался бы до следующего раза. Это возможно сделать, используя, например, библиотеку gstreamer. 

Я буду писать его под Ubuntu, поскольку использую данную ОС дома. Также можно попробовать сделать то же самое под Raspbian, если у вас есть Raspberry. Будильник будет работать как демон, тоесть не будет блокировать терминал,  а будет работать в фоновом режиме.

Установка библиотек


Установить небходимые библиотеки на Debian - системах можно так:

apt-get install python-gst0.10 gstreamer0.10-plugins-good gstreamer0.10-plugins-ugly

Чтобы проверить, что все работает корректно, откройте интерпретатор python и сделайте такие импорты: 

1 import gst
2 import gobject

Также можно попробовать проиграть какой-нибудь аудиофайл:

gst-launch-0.10 filesrc location=/path/to/file/test.mp3 ! decodebin ! audioconvert ! autoaudiosink

Для демонизации процесса понадобится библиотека daemonize, установить ее можно так:

pip install daemonize

Пишем будильник


Простейший демон с помощью данной библиотеки:

 1 from time import sleep
 2 from daemonize import Daemonize
 3 
 4 pid = "/tmp/test.pid"
 5 
 6 
 7 def main():
 8     while True:
 9         sleep(5)
10 
11 
12 daemon = Daemonize(app="test_app", pid=pid, action=main)
13 daemon.start()

Полезная команда, которая вам может пригодится, если вы запустили этот тестовый демон, или если вам нужно точно убить какой-то процесс в системе, сколько бы экземпляров данного процесса ни было:

1 sudo kill -9 `ps -ef | grep python | grep -v grep | awk '{print $2}'`

Разберем по частям, следующая команда выведет список процессов:

1 ps -ef

Дальше отберем только python процессы:

1 ps -ef | grep python

Затем удалим из полученного списка саму нашу команду

1 ps -ef | grep python | grep -v grep

И, наконец, отберем PID'ы процессов и убьем их:

1 sudo kill -9 `ps -ef | grep python | grep -v grep | awk '{print $2}'`

Gstreamer - это библиотека основанная на плагинах, один такой плагин - playbin мы будем использовать. Данные в gstreamer проходят через pipeline. Нам придется использовать элемент playbin для построения pipeline. Также будет задействована библиоткека gobject. Простейший аудиоплеер с помощью gstreamer:

 1 import gst
 2 import gobject
 3 import threading
 4 
 5 class AlarmClock():
 6 
 7     def __init__(self, musiclist):
 8         self.musiclist = musiclist
 9         self.song_num = 0
10         self.construct_pipeline()
11         self.set_property_file()
12 
13     def construct_pipeline(self):
14         self.player = gst.element_factory_make("playbin")
15         self.is_playing = False
16         self.connect_signals()
17 
18     def connect_signals(self):
19         # In this case, we only capture the messages
20         # put on the bus.
21         bus = self.player.get_bus()
22         bus.add_signal_watch()
23         bus.connect("message", self.message_handler)
24 
25     def play(self):
26         self.is_playing = True
27         self.player.set_state(gst.STATE_PLAYING)
28 
29     def handle_error(self, message):
30         print(message)
31 
32     def set_property_file(self):
33         self.player.set_property(
34             "uri",
35             "file://"+self.musiclist[self.song_num])
36 
37     def stop(self):
38         self.player.set_state(gst.STATE_NULL)
39         self.is_playing = False
40 
41     def message_handler(self, bus, message):
42         # Capture the messages on the bus and
43         # set the appropriate flag
44         msg_type = message.type
45         if msg_type == gst.MESSAGE_ERROR:
46             self.handle_error(message)
47         elif msg_type == gst.MESSAGE_EOS:
48             print("End Of Song")
49             if self.song_num < len(self.musiclist)-1:
50                 self.song_num += 1
51                 self.stop()
52                 self.set_property_file()
53                 self.play()
54             else:
55                 self.stop()
56 
57 
58 class GobInit(threading.Thread):
59     def __init__(self):
60         threading.Thread.__init__(self)
61 
62     def run(self):
63         gobject.threads_init()
64         self.loop = gobject.MainLoop()
65         self.loop.run()
66 
67 
68 PATH_TO_SONG = '/home/Downloads/test.mp3'
69 PATH_TO_SONG2 = '/home/Downloads/test2.mp3'
70 
71 
72 def main():
73     gob = GobInit()
74     gob.start()
75     print('start')
76     player = AlarmClock([PATH_TO_SONG, PATH_TO_SONG2])
77     print('player created')
78     player.play()
79 
80 
81 main()

При конце каждой песни на шину приходит сообщение MESSAGE_EOS, пользуясь этим, можно переключать песни, что и реализовано на строках 50-53.

Если добавить в этот плеер daemonize и логирование в файл /tmp/test.log:

 1 import gst
 2 import gobject
 3 import threading
 4 import logging
 5 import time
 6 from daemonize import Daemonize
 7 
 8 
 9 pid = "/tmp/test.pid"
10 logger = logging.getLogger(__name__)
11 logger.setLevel(logging.DEBUG)
12 logger.propagate = False
13 fh = logging.FileHandler("/tmp/test.log", "a")
14 fh.setLevel(logging.DEBUG)
15 logger.addHandler(fh)
16 keep_fds = [fh.stream.fileno()]
17 
18 
19 class AlarmClock():
20 
21     def __init__(self, musiclist):
22         self.musiclist = musiclist
23         self.song_num = 0
24         self.construct_pipeline()
25         self.set_property_file()
26 
27     def construct_pipeline(self):
28         self.player = gst.element_factory_make("playbin")
29         self.is_playing = False
30         self.connect_signals()
31 
32     def connect_signals(self):
33         # In this case, we only capture the messages
34         # put on the bus.
35         bus = self.player.get_bus()
36         bus.add_signal_watch()
37         bus.connect("message", self.message_handler)
38 
39     def play(self):
40         self.is_playing = True
41         self.player.set_state(gst.STATE_PLAYING)
42 
43     def handle_error(self, message):
44         logger.debug(message)
45 
46     def set_property_file(self):
47         self.player.set_property(
48             "uri",
49             "file://"+self.musiclist[self.song_num])
50 
51     def stop(self):
52         self.player.set_state(gst.STATE_NULL)
53         self.is_playing = False
54         logger.debug("player stopped")
55 
56     def message_handler(self, bus, message):
57         # Capture the messages on the bus and
58         # set the appropriate flag
59         msg_type = message.type
60         if msg_type == gst.MESSAGE_ERROR:
61             self.handle_error(message)
62         elif msg_type == gst.MESSAGE_EOS:
63             logger.debug("End Of Song")
64             if self.song_num < len(self.musiclist)-1:
65                 self.song_num += 1
66                 self.stop()
67                 self.set_property_file()
68                 self.play()
69             else:
70                 self.stop()
71 
72 
73 class GobInit(threading.Thread):
74     def __init__(self):
75         threading.Thread.__init__(self)
76 
77     def run(self):
78         gobject.threads_init()
79         self.loop = gobject.MainLoop()
80         self.loop.run()
81 
82 
83 PATH_TO_SONG = '/home/Downloads/test.mp3'
84 PATH_TO_SONG2 = '/home/Downloads/test2.mp3'
85 
86 
87 def main():
88     logger.debug("start")
89     gob = GobInit()
90     gob.start()
91     player = AlarmClock([PATH_TO_SONG, PATH_TO_SONG2])
92     logger.debug("player created")
93     player.play()
94 
95 
96 daemon = Daemonize(app="test_app", pid=pid, action=main, keep_fds=keep_fds)
97 daemon.start()

Теперь вы можете запустить данный плеер:

python alarm_clock.py

А в другом терминале мониторить лог:

tail -f /tmp/test.log

Также, вам может понадобится что-то отдебажить или принтануть, можете тогда перевести в блокирующий режим, задав foreground=True:

1 daemon = Daemonize(app="test_app", pid=pid, action=main, keep_fds=keep_fds,
2                    foreground=True)

Далее, хочется сделать так, чтобы после одного проигрывания снова устанавливался первый трек, для этого необходимо так изменить функцию message_handler:

 1 def message_handler(self, bus, message):
 2     msg_type = message.type
 3     if msg_type == gst.MESSAGE_ERROR:
 4         self.handle_error(message)
 5     elif msg_type == gst.MESSAGE_EOS:
 6         logger.debug("End Of Song")
 7         if self.song_num < len(self.musiclist)-1:
 8             self.song_num += 1
 9             self.stop()
10             self.set_property_file()
11             self.play()
12         else:
13             self.stop()
14             self.song_num = 0
15             self.set_property_file()

Я добавил строки 14 и 15 - установку первого трека после того, как все треки проиграны, чтобы на следующий день будильник начал играть с первого трека, также необходимо поменять функцию main и добавить функцию sleep_till_next_play:

 1 from datetime import datetime, timedelta
 2 ...
 3 
 4 
 5 def sleep_till_next_play(
 6         current_datetime, time_weekdays, time_weekend, logger):
 7     next_datetime = current_datetime + timedelta(days=1)
 8     next_str_date = next_datetime.strftime('%d/%m/%Y')
 9     if next_datetime.weekday() not in (5, 6):
10         next_datetime_play = datetime.strptime(
11             next_str_date + " " + time_weekdays,  "%d/%m/%Y %H:%M")
12     else:
13         next_datetime_play = datetime.strptime(
14             next_str_date + " " + time_weekend,  "%d/%m/%Y %H:%M")
15     delta = next_datetime_play - current_datetime
16     sleep_sec = delta.total_seconds()
17     logger.debug("sleep till next start for {0} sec".format(sleep_sec))
18     time.sleep(sleep_sec)
19 
20 
21 def main():
22     logger.debug("start")
23     gob = GobInit()
24     gob.start()
25     player = AlarmClock([PATH_TO_SONG, PATH_TO_SONG2])
26     logger.debug("player created")
27 
28     time_weekdays = "8:00"
29     time_weekend = "9:00"
30     current_datetime = datetime.now()
31     current_str_date = current_datetime.strftime('%d/%m/%Y')
32     if current_datetime.weekday() not in (5, 6):
33         datetime_play = datetime.strptime(
34             current_str_date + " " + time_weekdays,  "%d/%m/%Y %H:%M")
35     else:
36         datetime_play = datetime.strptime(
37             current_str_date + " " + time_weekend,  "%d/%m/%Y %H:%M")
38     delta = datetime_play - current_datetime
39     if delta.total_seconds() < 0:
40         sleep_till_next_play(
41             current_datetime, time_weekdays, time_weekend, logger)
42     else:
43         sleep_sec = delta.total_seconds()
44         logger.debug("sleep till start for {0} sec".format(sleep_sec))
45         time.sleep(sleep_sec)
46 
47     while True:
48         player.play()
49         sleep_till_next_play(
50             datetime.now(), time_weekdays, time_weekend, logger)

Функция weekday() возвращает числа от 0 до 6, которые соответствуют дням с понедельника по воскресенье. Сначала определяется текущее время (с. 30), если оно больше заданного, то демон спит до следующего дня (с. 40), а если меньше, то до заданного времени (стр. 45). Затем запускается бесконечный цикл, в котором будильник каждый раз играет и отправляется спать до следующего дня.

Теперь добавим возможность остановить будильник и узнать статус (запущен или нет), для этого используем модуль argparse:

 1 ...
 2 import argparse
 3 import sys
 4 import os
 5 import signal
 6 
 7 ...
 8 def main():
 9     ...
10 
11 def kill(pid_f, logger):
12     if os.path.isfile(pid_f):
13         with open(pid_f) as pid_file:
14             pid = pid_file.read()
15             try:
16                 os.kill(int(pid), signal.SIGKILL)
17             except (OSError, ValueError) as e:
18                 logger.debug(
19                     'Process is not killed due to: {0}'.format(e))
20             else:
21                 logger.debug('Stopped')
22                 os.remove(pid_f)
23     else:
24         logger.debug(
25             'There is no pid_file, nothing to kill')
26 
27 
28 parser = argparse.ArgumentParser()
29 mutually_exclusive_group = parser.add_mutually_exclusive_group(
30         required=True)
31 
32 mutually_exclusive_group.add_argument(
33     "-start", action="store_true")
34 mutually_exclusive_group.add_argument(
35     "-stop", action="store_true")
36 mutually_exclusive_group.add_argument(
37     "-status", action="store_true")
38 args = vars(parser.parse_args())
39 
40 if args.get('stop'):
41     kill(PID, logger)
42     sys.exit()
43 
44 elif args.get('status'):
45     try:
46         with open(PID) as pid_file:
47             pid = pid_file.read()
48         os.kill(int(pid), 0)
49     except Exception as e:
50         logger.debug("Process is stopped")
51     else:
52         logger.debug("Process is running")
53     sys.exit()
54 
55 kill(PID, logger)
56 
57 daemon = Daemonize(app="test_app", pid=PID, action=main, keep_fds=keep_fds)
58 daemon.start()

Теперь проверить статус демона, убить его или запустить можно с помощью аргументов status, stop, start. Для корректной работы нужно передать хотя бы один аргумент:

python alarm_clock.py -status

python alarm_clock.py -stop

python alarm_clock.py -start

После выполнения любой из этих команд смотрите на то, что залогировалось (у меня лог задан "/tmp/test.log", вы можете поместить его куда хотите). Я добавил функцию kill, для того, чтобы нельзя было включить несколько процессов одновременно. После небольшого рефакторинга получилось:

  1 # -*- coding: utf-8 -*-
  2 import gst
  3 import gobject
  4 import threading
  5 import logging
  6 import time
  7 from datetime import datetime, timedelta
  8 from daemonize import Daemonize
  9 import argparse
 10 import sys
 11 import os
 12 import signal
 13 
 14 PID = "/tmp/test.pid"
 15 # настройки логера
 16 logger = logging.getLogger(__name__)
 17 logger.setLevel(logging.DEBUG)
 18 logger.propagate = False
 19 fh = logging.FileHandler("/tmp/test.log", "a")
 20 fh.setLevel(logging.DEBUG)
 21 logger.addHandler(fh)
 22 keep_fds = [fh.stream.fileno()]
 23 
 24 # настройки в какое время играть
 25 time_weekdays = "8:30"
 26 time_weekend = "9:00"
 27 
 28 # пути до песен
 29 basic_path = '/home/Downloads/'
 30 path_to_song = basic_path + 'test.mp3'
 31 path_to_song_2 = basic_path + 'test2.mp3'
 32 
 33 
 34 class AlarmClock():
 35 
 36     def __init__(self, musiclist):
 37         self.musiclist = musiclist
 38         self.song_num = 0
 39         self.construct_pipeline()
 40         self.set_property_file()
 41 
 42     def construct_pipeline(self):
 43         self.player = gst.element_factory_make("playbin")
 44         self.is_playing = False
 45         self.connect_signals()
 46 
 47     def connect_signals(self):
 48         bus = self.player.get_bus()
 49         bus.add_signal_watch()
 50         bus.connect("message", self.message_handler)
 51 
 52     def play(self):
 53         self.is_playing = True
 54         self.player.set_state(gst.STATE_PLAYING)
 55         logger.debug("player set to play")
 56 
 57     def handle_error(self, message):
 58         logger.debug(message)
 59 
 60     def set_property_file(self):
 61         self.player.set_property(
 62             "uri",
 63             "file://"+self.musiclist[self.song_num])
 64 
 65     def stop(self):
 66         self.player.set_state(gst.STATE_NULL)
 67         self.is_playing = False
 68         logger.debug("player stopped")
 69 
 70     def message_handler(self, bus, message):
 71         msg_type = message.type
 72         if msg_type == gst.MESSAGE_ERROR:
 73             self.handle_error(message)
 74         elif msg_type == gst.MESSAGE_EOS:
 75             logger.debug("End Of Song")
 76             if self.song_num < len(self.musiclist)-1:
 77                 self.song_num += 1
 78                 self.stop()
 79                 self.set_property_file()
 80                 self.play()
 81             else:
 82                 self.stop()
 83                 self.song_num = 0
 84                 self.set_property_file()
 85 
 86 
 87 class GobInit(threading.Thread):
 88     def __init__(self):
 89         threading.Thread.__init__(self)
 90 
 91     def run(self):
 92         gobject.threads_init()
 93         self.loop = gobject.MainLoop()
 94         self.loop.run()
 95 
 96 
 97 def get_delta(datetime_obj, str_date, time_weekdays, time_weekend,
 98               current_datetime_obj=None):
 99     """Функция для вычисления разности между двумя datetime"""
100 
101     if datetime_obj.weekday() not in (5, 6):
102         datetime_play = datetime.strptime(
103             str_date + " " + time_weekdays,  "%d/%m/%Y %H:%M")
104     else:
105         datetime_play = datetime.strptime(
106             str_date + " " + time_weekend,  "%d/%m/%Y %H:%M")
107     if current_datetime_obj:
108         delta = datetime_play - current_datetime_obj
109     else:
110         delta = datetime_play - datetime_obj
111     return delta
112 
113 
114 def sleep_till_next_play(
115         current_datetime, time_weekdays, time_weekend, logger):
116     next_datetime = current_datetime + timedelta(days=1)
117     next_str_date = next_datetime.strftime('%d/%m/%Y')
118     delta = get_delta(
119         next_datetime, next_str_date, time_weekdays, time_weekend,
120         current_datetime)
121     sleep_sec = delta.total_seconds()
122     logger.debug("sleep till next start for {0} sec".format(sleep_sec))
123     time.sleep(sleep_sec)
124 
125 
126 def main():
127     logger.debug("start")
128     gob = GobInit()
129     gob.start()
130     player = AlarmClock([path_to_song, path_to_song_2])
131     logger.debug("player created")
132 
133     current_datetime = datetime.now()
134     current_str_date = current_datetime.strftime('%d/%m/%Y')
135     delta = get_delta(
136         current_datetime, current_str_date, time_weekdays, time_weekend)
137     # время для проигрывания треков уже прошло, спим до следующего раза:
138     if delta.total_seconds() < 0:
139         sleep_till_next_play(
140             current_datetime, time_weekdays, time_weekend, logger)
141     # спим до времени, в которое нужно проигрывать треки:
142     else:
143         sleep_sec = delta.total_seconds()
144         logger.debug("sleep till start for {0} sec".format(sleep_sec))
145         time.sleep(sleep_sec)
146 
147     while True:
148         player.play()
149         # спим до следующего раза
150         sleep_till_next_play(
151             datetime.now(), time_weekdays, time_weekend, logger)
152 
153 
154 def kill(pid_f, logger):
155     if os.path.isfile(pid_f):
156         with open(pid_f) as pid_file:
157             pid = pid_file.read()
158             try:
159                 os.kill(int(pid), signal.SIGKILL)
160             except (OSError, ValueError) as e:
161                 logger.debug(
162                     'Process is not killed due to: {0}'.format(e))
163             else:
164                 logger.debug('Stopped')
165                 os.remove(pid_f)
166     else:
167         logger.debug(
168             'There is no pid_file, nothing to kill')
169 
170 
171 parser = argparse.ArgumentParser()
172 mutually_exclusive_group = parser.add_mutually_exclusive_group(
173         required=True)
174 
175 mutually_exclusive_group.add_argument(
176     "-start", action="store_true")
177 mutually_exclusive_group.add_argument(
178     "-stop", action="store_true")
179 mutually_exclusive_group.add_argument(
180     "-status", action="store_true")
181 args = vars(parser.parse_args())
182 
183 if args.get('stop'):
184     kill(PID, logger)
185     sys.exit()
186 
187 elif args.get('status'):
188     try:
189         with open(PID) as pid_file:
190             pid = pid_file.read()
191         os.kill(int(pid), 0)
192     except Exception as e:
193         logger.debug("Process is stopped")
194     else:
195         logger.debug("Process is running")
196     sys.exit()
197 
198 # в любом случае убиваю демон, чтобы не запустить дважды
199 kill(PID, logger)
200 
201 daemon = Daemonize(app="test_app", pid=PID, action=main, keep_fds=keep_fds)
202 daemon.start()

Я собрал все настройки в начале, добавил функцию get_delta, потому что я использовал один и тот же кусок кода дважды (строки 101-106). Соответственно, я вынес этот дублированный код в эту функцию, чтобы следовать принципу DRY, что является хорошим тоном в программировании.

И все вроде хорошо, и все работает, но потом я еще раз посмотрел на этот код:

 1 def sleep_till_next_play(
 2         current_datetime, time_weekdays, time_weekend, logger):
 3     next_datetime = current_datetime + timedelta(days=1)
 4     next_str_date = next_datetime.strftime('%d/%m/%Y')
 5     delta = get_delta(
 6         next_datetime, next_str_date, time_weekdays, time_weekend,
 7         current_datetime)
 8     sleep_sec = delta.total_seconds()
 9     logger.debug("sleep till next start for {0} sec".format(sleep_sec))
10     time.sleep(sleep_sec)

И осознал, что я написал кучу говнокода, посмотрел как решают проблему нормальные люди, понял, что все можно сделать гораздо проще и решил переписать часть кода, убрав совсем функции sleep_till_next_play, get_delta, и подчистив main:

  1 import gst
  2 import gobject
  3 import threading
  4 import logging
  5 from datetime import datetime
  6 from daemonize import Daemonize
  7 import argparse
  8 import sys
  9 import os
 10 import signal
 11 
 12 PID = "/tmp/test.pid"
 13 # logger's settings
 14 log_file = "/tmp/test.log"
 15 logger = logging.getLogger(__name__)
 16 logger.setLevel(logging.DEBUG)
 17 logger.propagate = False
 18 fh = logging.FileHandler(log_file, "a")
 19 fh.setLevel(logging.DEBUG)
 20 logger.addHandler(fh)
 21 keep_fds = [fh.stream.fileno()]
 22 
 23 # what time to play
 24 time_weekdays = "21:05"
 25 time_weekend = "9:00"
 26 
 27 # paths to songs
 28 basic_path = '/home/austinnikov/Downloads/'
 29 song_array = (basic_path+'test.mp3', basic_path+'test2.mp3')
 30 
 31 
 32 class AlarmClock():
 33 
 34     def __init__(self, musiclist):
 35         self.musiclist = musiclist
 36         self.song_num = 0
 37         self.construct_pipeline()
 38         self.set_property_file()
 39 
 40     def construct_pipeline(self):
 41         self.player = gst.element_factory_make("playbin")
 42         self.is_playing = False
 43         self.connect_signals()
 44 
 45     def connect_signals(self):
 46         bus = self.player.get_bus()
 47         bus.add_signal_watch()
 48         bus.connect("message", self.message_handler)
 49 
 50     def play(self):
 51         self.is_playing = True
 52         self.player.set_state(gst.STATE_PLAYING)
 53 
 54     def handle_error(self, message):
 55         logger.debug(message)
 56 
 57     def set_property_file(self):
 58         self.player.set_property(
 59             "uri",
 60             "file://"+self.musiclist[self.song_num])
 61 
 62     def stop(self):
 63         self.player.set_state(gst.STATE_NULL)
 64         self.is_playing = False
 65         logger.debug("player stopped")
 66 
 67     def message_handler(self, bus, message):
 68         msg_type = message.type
 69         if msg_type == gst.MESSAGE_ERROR:
 70             self.handle_error(message)
 71         elif msg_type == gst.MESSAGE_EOS:
 72             logger.debug("End Of Song")
 73             if self.song_num < len(self.musiclist)-1:
 74                 self.song_num += 1
 75                 self.stop()
 76                 self.set_property_file()
 77                 self.play()
 78             else:
 79                 self.stop()
 80                 self.song_num = 0
 81                 self.set_property_file()
 82 
 83 
 84 class GobInit(threading.Thread):
 85     def __init__(self):
 86         threading.Thread.__init__(self)
 87 
 88     def run(self):
 89         gobject.threads_init()
 90         self.loop = gobject.MainLoop()
 91         self.loop.run()
 92 
 93 
 94 def main():
 95     logger.debug("start")
 96     gob = GobInit()
 97     gob.start()
 98     player = AlarmClock(song_array)
 99     logger.debug("player created")
100 
101     hour_weekdays = int(time_weekdays.split(':')[0])
102     minute_weekdays = int(time_weekdays.split(':')[1])
103 
104     hour_weekend = int(time_weekend.split(':')[0])
105     minute_weekend = int(time_weekend.split(':')[1])
106 
107     while True:
108         now = datetime.now()
109         if any((
110                 now.weekday() in (5, 6) and
111                 (hour_weekend == now.hour and minute_weekend == now.minute),
112                 now.weekday() not in (5, 6) and
113                 (hour_weekdays == now.hour and minute_weekdays == now.minute)
114         )):
115             player.play()
116 
117 
118 def kill(pid_f, logger):
119     if os.path.isfile(pid_f):
120         with open(pid_f) as pid_file:
121             pid = pid_file.read()
122             try:
123                 os.kill(int(pid), signal.SIGKILL)
124             except (OSError, ValueError) as e:
125                 logger.debug(
126                     'Process is not killed due to: {0}'.format(e))
127             else:
128                 logger.debug('Stopped')
129                 os.remove(pid_f)
130     else:
131         logger.debug(
132             'There is no pid_file, nothing to kill')
133 
134 
135 parser = argparse.ArgumentParser()
136 mutually_exclusive_group = parser.add_mutually_exclusive_group(
137         required=True)
138 
139 mutually_exclusive_group.add_argument(
140     "-start", action="store_true")
141 mutually_exclusive_group.add_argument(
142     "-stop", action="store_true")
143 mutually_exclusive_group.add_argument(
144     "-status", action="store_true")
145 args = vars(parser.parse_args())
146 
147 if args.get('stop'):
148     kill(PID, logger)
149     sys.exit()
150 
151 elif args.get('status'):
152     try:
153         with open(PID) as pid_file:
154             pid = pid_file.read()
155         os.kill(int(pid), 0)
156     except Exception as e:
157         logger.debug("Process is stopped")
158     else:
159         logger.debug("Process is running")
160     sys.exit()
161 
162 # kill in order not to start several processes
163 kill(PID, logger)
164 
165 daemon = Daemonize(app="test_app", pid=PID, action=main, keep_fds=keep_fds)
166 daemon.start()

Выводы


Рефакторинг - хорошо! Сделан, правда, не с первой попытки, простой будильник, который работает как демон в фоновом режиме, управляется аргументами из командной строки и логирует свою работу. Правда, у него есть один недостаток: если вы заблокируете компьютер чем-то вроде xflock, то тайминги собьются, так что лучше просто убирать яркость монитора на ночь (если, конечно, вы не используете что-то типа raspberry pi). Код на github.

Метки

python
Если вам понравился пост, можете поделиться им в соцсетях: