| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- from machine import Pin
- import rp2
- from micropython import schedule
- import utime as time
- from Debug import Debug
- from TimeSource import TimeSource
- # インジケーターLEDのGPIO
- INDICATOR_PIN = 25
- @rp2.asm_pio(
- sideset_init=(rp2.PIO.OUT_LOW),
- )
- def jjy_capture_p():
- wrap_target()
- wait(1, pin, 0) # 立ち上がりを待つ
- set(x, 0).side(1) # カウンタのリセット
-
- label("loop")
- # --- 1ms 待機ブロック (10kHz動作時、10クロック消費) ---
- nop() [7] # 8クロック
- nop() # 1クロック
- jmp(x_dec, "next") # 1クロック (xをカウントダウン)
-
- label("next")
- jmp(pin, "loop") # まだHighならループ継続(+1クロック)
-
- mov(isr, x).side(0) # カウント結果をISRへ
- push() # Pythonへ送信
- irq(rel(0))
- wrap()
- @rp2.asm_pio(
- sideset_init=(rp2.PIO.OUT_LOW),
- )
- def jjy_capture_n():
- wrap_target()
- wait(0, pin, 0) # 立ち下がりを待つ
- set(x, 0).side(1) # カウンタのリセット
-
- label("loop")
- # --- 1ms 待機ブロック (10kHz動作時、10クロック消費) ---
- nop() [7] # 8クロック
- jmp(x_dec, "next") # 1クロック (xをカウントダウン)
-
- label("next")
- jmp(pin, "success") # Highならループ脱出(+1クロック)
- jmp("loop") # Lowなのでループ継続(+1クロック)
- label("success")
- mov(isr, x).side(0) # カウント結果をISRへ
- push() # Pythonへ送信
- irq(rel(0))
- wrap()
- # 定数定義
- JJY_ERROR = 3 # エラー
- JJY_P_MARK = 2 # ポジションマーカー
- MARK_POSITIONS = (0, 9, 19, 29, 39, 49, 59) # ポジションマーカーの位置
- class JJYDecoder(Debug,TimeSource):
- """
- JJYデコーダークラス
- """
- def __init__(self, callback=None, smid = 0, input_port=15, input_pol=1, indicator_port=INDICATOR_PIN):
- """
- コンストラクタ
- Args:
- callback: コールバック関数 callback((jjy_time, received_ticks_ms)):
- jjy_time: 受信した日本標準時(UNIXエポックタイム)
- received_ticks_ms: jjy_timeを受信したtime.ticks_ms()
- smid: このクラスで使用するステートマシン番号
- input_port: JJY受信ユニットの出力が接続されているGPIO番号
- input_pol: JJY受信ユニットの出力極性、1=正論理、0=負論理
- idicator_port: インジケーターLEDのGPIO番号
- """
- # 60秒分のビットを格納するバッファ
- self.bit_buffer = [0] * 60
- # 現在の秒位置
- self.pos = 0
- # 同期が始まったならTrue
- self.synced = False
- # マーカーを識別したらTrue
- self.last_was_marker = False
- # 同期に成功した回数
- self.sync_counter = 0
- # 受信ステートマシンの割り込みが発生したticks_ms
- self.ticks_ms = 0
- # コールバック関数
- self.callback_funcs = []
- # ステートマシン番号
- self.sm_id = smid
- # JJY受信ステートマシンのロードと起動
- in_pin = Pin(input_port, Pin.IN, pull=-1)
- self.sm: rp2.StateMachine
- if input_pol == 1: # 正論理
- self.sm = rp2.StateMachine(self.sm_id, jjy_capture_p, freq=10000, in_base=in_pin, jmp_pin=in_pin,sideset_base=Pin(indicator_port))
- else: # 負論理
- self.sm = rp2.StateMachine(self.sm_id, jjy_capture_n, freq=10000, in_base=in_pin, jmp_pin=in_pin,sideset_base=Pin(indicator_port))
- self.sm.irq(self._pio_handler,1)
- self.add_callback(callback)
- # self.sm.active(1)
- @micropython.native
- def _pio_handler(self, p):
- """
- PIO割り込みハンドラ
- """
- schedule(self._jjy_interrupt, (p, time.ticks_ms()))
- @micropython.native
- def _jjy_interrupt(self, args):
- """
- JJY受信割り込みハンドラ
- """
- p, ticks = (args)
- pulse_width = 0x100000000 - int(self.sm.get())
- self.ticks_ms = ticks - pulse_width # この信号の推定立ち上がり時間
-
- bit = JJY_ERROR # 3 はエラー
- if pulse_width < 100: pass # おそらくはノイズ、無視する
- elif pulse_width < 350: bit = JJY_P_MARK # ポジションマーカー
- elif pulse_width < 600: bit = 1
- elif pulse_width < 950: bit = 0
- else: pass # なんらかの異常
- self.dprint("pulse width=%d, bit=%d" % (pulse_width, bit))
- if bit == JJY_ERROR: # エラーが起きたらいったんご破算にする
- self.synced = False
- self.pos = 0
-
- if bit == JJY_P_MARK: # ポジションマーカー
- if self.last_was_marker: # ポジションマーカーが2つ続いたら1フレーム開始
- self.dprint("--- Frame Sync ---")
- self.pos = 0
- self.synced = True # 同期開始
- self.last_was_marker = True
- if self.synced: # ポジションマーカーの位置をチェック
- if self.pos not in MARK_POSITIONS:
- self.dprint("-- Error: Out of sync --") # 同期が外れているのでご破算にする
- self.synced = False
- self.pos = 0
- else:
- self.last_was_marker = False
-
- if self.synced:
- self.bit_buffer[self.pos] = bit
- self.pos += 1
- # 1フレーム受信完了
- if self.pos >= 60:
- self.__decode_frame()
- self.pos = 0
- self.synced = False # 継続せず
-
- @micropython.native
- def __decode_frame(self):
- """
- 1フレーム60個分のデータを日本標準時にデコードするプライベート関数
- """
- # パリティチェック
- pa2 = (self.bit_buffer[1]+self.bit_buffer[2]+self.bit_buffer[3]+self.bit_buffer[5]+self.bit_buffer[6]+self.bit_buffer[7]+self.bit_buffer[8]) % 2
- pa1 = (self.bit_buffer[12]+self.bit_buffer[13]+self.bit_buffer[15]+self.bit_buffer[16]+self.bit_buffer[17]+self.bit_buffer[18]) % 2
- if pa1 != self.bit_buffer[36] or pa2 != self.bit_buffer[37]: # エラー、デコードを諦める
- self.dprint("-- Parity Error --")
- return
-
- # minute
- m10 = self.bit_buffer[1] << 2 | self.bit_buffer[2] << 1 | self.bit_buffer[3]
- m1 = self.bit_buffer[5] << 3 | self.bit_buffer[6] << 2 | self.bit_buffer[7] << 1 | self.bit_buffer[8]
- minute = m10 * 10 + m1
- # hour
- h10 = self.bit_buffer[12] << 1 | self.bit_buffer[13]
- h1 = self.bit_buffer[15] << 3 | self.bit_buffer[16] << 2 | self.bit_buffer[17] << 1 | self.bit_buffer[18]
- hour = h10 * 10 + h1
- # yearday
- yd100 = self.bit_buffer[22] << 1 | self.bit_buffer[23]
- yd10 = self.bit_buffer[25] << 3 | self.bit_buffer[26] << 2 | self.bit_buffer[27] << 1 | self.bit_buffer[28]
- yd1 = self.bit_buffer[30] << 3 | self.bit_buffer[31] << 2 | self.bit_buffer[32] << 1 | self.bit_buffer[33]
- yearday = yd100 * 100 + yd10 * 10 + yd1
- # year
- y10 = self.bit_buffer[41] << 3 | self.bit_buffer[42] << 2 | self.bit_buffer[43] << 1 | self.bit_buffer[44]
- y1 = self.bit_buffer[45] << 3 | self.bit_buffer[46] << 2 | self.bit_buffer[47] << 1 | self.bit_buffer[48]
- year = y10 * 10 + y1 + 2000 # 2000年代は決め打ち
- # month, mday
- starting_sec = time.mktime((year,1,1,0,0,0,0,0)) # 起点=本年1月1日0時0分
- target_sec = starting_sec + (yearday - 1) * 86400 # 経過秒を加算
- current_dt = time.localtime(target_sec)
- mday = current_dt[2]
- month = current_dt[1]
- # weekday
- jjy_weekday = self.bit_buffer[50] << 2 | self.bit_buffer[51] << 1 | self.bit_buffer[52]
- weekday = (jjy_weekday + 6) % 7
- # JJYから得た時刻を通知する
- jjy_time = time.mktime((year,month,mday,hour,minute,59,weekday,yearday))
- data = (jjy_time, self.ticks_ms)
- try:
- for callback in self.callback_funcs:
- if callback is not None:
- schedule(callback, data)
- except RuntimeError:
- self.dprint("micropython.schedule queue full")
- def add_callback(self, callback):
- """
- 時刻を通知するコールバック関数を登録する
- Args:
- callback: コールバック関数 callback((jjy_time, received_ticks_ms)):
- jjy_time: 受信した日本標準時(UNIXエポックタイム)
- received_ticks_ms: jjy_timeを受信したtime.ticks_ms()
- """
- if callback is not None:
- self.callback_funcs.append(callback)
-
- def stop(self):
- """ステートマシンの実行を一時停止する"""
- self.sm.active(0)
-
- def restart(self):
- """ステートマシンをリセットして再起動する"""
- self.sm.restart()
- self.sm.active(1)
- def release(self):
- """
- 終了処理。
- ステートマシンを停止し、割り込みを解除した上でPIOコードをメモリから除去する。
- """
- # SM停止
- self.sm.active(0)
- # 割り込みハンドラ解除
- self.sm.irq(None, 1)
- # PIOコード除去
- pio_no = 0
- if self.sm_id > 3:
- pio_no = 1
- pio = rp2.PIO(pio_no)
- pio.remove_program()
|