wake-up-neo.net

Python threading.timer - Funktion alle 'n' Sekunden wiederholen

Ich habe Probleme mit dem Python-Timer und würde mich über Ratschläge oder Hilfe sehr freuen: D

Ich weiß nicht, wie Threads funktionieren, aber ich möchte nur alle 0,5 Sekunden eine Funktion auslösen und den Timer starten, stoppen und zurücksetzen können.

Ich erhalte jedoch weiterhin RuntimeError: threads can only be started once, wenn ich threading.timer.start() zweimal ausführen. Gibt es dafür eine Umgehung? Ich habe vor jedem Start versucht, threading.timer.cancel() anzuwenden.

Pseudo-Code:

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()
64
user1431282

Am besten starten Sie den Timer-Thread einmal. In Ihrem Timer-Thread würden Sie Folgendes kodieren

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function

In dem Code, der den Timer gestartet hat, können Sie das gestoppte Ereignis set verwenden, um den Timer zu stoppen.

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
83
Hans Then

Timer-Threads verwenden-

from threading import Timer,Thread,Event


class perpetualTimer():

   def __init__(self,t,hFunction):
      self.t=t
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      self.thread.start()

   def start(self):
      self.thread.start()

   def cancel(self):
      self.thread.cancel()

def printer():
    print 'ipsem lorem'

t = perpetualTimer(5,printer)
t.start()

dies kann durch t.cancel() gestoppt werden.

26

Von Äquivalent von setInterval in Python :

import threading

def setInterval(interval):
    def decorator(function):
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop(): # executed in another thread
                while not stopped.wait(interval): # until stopped
                    function(*args, **kwargs)

            t = threading.Thread(target=loop)
            t.daemon = True # stop if the program exits
            t.start()
            return stopped
        return wrapper
    return decorator

Verwendungszweck:

@setInterval(.5)
def function():
    "..."

stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set() 

Oder hier die gleiche Funktionalität aber als Standalone-Funktion anstelle eines Dekorateurs :

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls() 

So geht's ohne Threads .

23
jfs

Um eine korrekte Antwort mit Timer als OP zu erhalten, werde ich die Antwort von swapnil jariwala verbessern :

from threading import Timer
import time


class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()

Die Importzeit ist ohne die Beispielverwendung optional.

4
Bill Schumacher

Ich habe Code in swapnil-jariwala geändert, um eine kleine Konsolenuhr zu erstellen.

from threading import Timer, Thread, Event
from datetime import datetime

class PT():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

def printer():
    tempo = datetime.today()
    h,m,s = tempo.hour, tempo.minute, tempo.second
    print(f"{h}:{m}:{s}")


t = PT(1, printer)
t.start()

AUSGABE

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

Timer mit einer tkinter Graphic-Schnittstelle

Dieser Code versetzt den Zeitgeber in ein kleines Fenster mit tkinter

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk

app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def printer():
    tempo = datetime.today()
    clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
    try:
        lab['text'] = clock
    except RuntimeError:
        exit()


t = perpetualTimer(1, printer)
t.start()
app.mainloop()

Ein Beispiel für ein Kartenspiel (Art)

from threading import Timer, Thread, Event
from datetime import datetime


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


x = datetime.today()
start = x.second


def printer():
    global questions, counter, start
    x = datetime.today()
    tempo = x.second
    if tempo - 3 > start:
        show_ans()
    #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
    print()
    print("-" + questions[counter])
    counter += 1
    if counter == len(answers):
        counter = 0


def show_ans():
    global answers, c2
    print("It is {}".format(answers[c2]))
    c2 += 1
    if c2 == len(answers):
        c2 = 0


questions = ["What is the capital of Italy?",
             "What is the capital of France?",
             "What is the capital of England?",
             "What is the capital of Spain?"]

answers = "Rome", "Paris", "London", "Madrid"

counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

ausgabe:

Get ready to answer
>>> 
-What is the capital of Italy?
It is Rome

-What is the capital of France?
It is Paris

-What is the capital of England?
...
3
Giovanni Gianni

Ein wenig an Hans Thens Antwort verbessern , wir können die Timer-Funktion einfach subclassieren. Das Folgende wird unser gesamter "repeat timer" -Code und kann als Drop-In-Ersatz für Threading verwendet werden. Timer mit allen gleichen Argumenten:

from threading import Timer

class RepeatTimer(Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

Anwendungsbeispiel:

def dummyfn(msg="foo"):
    print(msg)

timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

erzeugt die folgende Ausgabe:

foo
foo
foo
foo

und

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

produziert

bar
bar
bar
bar
3
right2clicky

Ich musste das für ein Projekt machen. Am Ende habe ich einen separaten Thread für die Funktion gestartet

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

**** Herzschlag ist meine Funktion, Arbeiter ist eines meiner Argumente ****

innerhalb meiner Herzschlagfunktion:

def heartbeat(worker):

    while True:
        time.sleep(5)
        #all of my code

Wenn ich also den Thread starte, wird die Funktion wiederholt 5 Sekunden warten, den gesamten Code ausführen und dies auf unbestimmte Zeit tun. Wenn Sie den Prozess beenden möchten, beenden Sie einfach den Thread.

1
Andrew Wilkins

Ich habe eine Klasse implementiert, die als Timer funktioniert.

Ich lasse den Link hier, falls jemand ihn benötigt: https://github.com/ivanhalencp/python/tree/master/xTimer

1
Ivan Coppes

Ich mag die Antwort von right2clicky, vor allem, weil es nicht erforderlich ist, dass ein Thread abgerissen wird und jedes Mal, wenn der Timer tickt, ein neuer Thread erstellt wird. Darüber hinaus ist es ein einfacher Vorgang, eine Klasse mit einem Timer-Rückruf zu erstellen, der regelmäßig aufgerufen wird. Das ist mein normaler Anwendungsfall:

class MyClass(RepeatTimer):
    def __init__(self, period):
        super().__init__(period, self.on_timer)

    def on_timer(self):
        print("Tick")


if __== "__main__":
    mc = MyClass(1)
    mc.start()
    time.sleep(5)
    mc.cancel()
0
Frank P
from threading import Timer
def TaskManager():
    #do stuff
    t = Timer( 1, TaskManager )
    t.start()

TaskManager()

Hier ist ein kleines Beispiel, das hilft zu verstehen, wie es läuft . Function taskManager () am Ende einen verzögerten Funktionsaufruf für sich selbst erstellen.

Versuchen Sie, die "Dalay" -Variable zu ändern, und Sie werden den Unterschied erkennen können

from threading import Timer, _sleep

# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True

def taskManager():

    global counter, DATA, delay, allow_run
    counter += 1

    if len(DATA) > 0:
        if FIFO:
            print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
        else:
            print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")

    else:
        print("["+str(counter)+"] no data")

    if allow_run:
        #delayed method/function call to it self
        t = Timer( dalay, taskManager )
        t.start()

    else:
        print(" END task-manager: disabled")

# ------------------------------------------
def main():

    DATA.append("data from main(): 0")
    _sleep(2)
    DATA.append("data from main(): 1")
    _sleep(2)


# ------------------------------------------
print(" START task-manager:")
taskManager()

_sleep(2)
DATA.append("first data")

_sleep(2)
DATA.append("second data")

print(" START main():")
main()
print(" END main():")

_sleep(2)
DATA.append("last data")

allow_run = False
0
ch3ll0v3k

Dies ist eine alternative Implementierung, die Funktion anstelle von Klasse verwendet. Inspiriert von @Andrew Wilkins oben.

Weil Warten genauer ist als Schlafen (es berücksichtigt die Laufzeit der Funktion):

import threading

PING_ON = threading.Event()

def ping():
  while not PING_ON.wait(1):
    print("my thread %s" % str(threading.current_thread().ident))

t = threading.Thread(target=ping)
t.start()

sleep(5)
PING_ON.set()
0
Paul Kenjora