diff --git a/.gitignore b/.gitignore index 200c438..b97a54d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ clean.sh *.old *.pyc lint.txt +debug/*.midi +debug/*.txbt +examples/*.midi diff --git a/CONTRIBUTORS b/CONTRIBUTORS new file mode 100644 index 0000000..58b657e --- /dev/null +++ b/CONTRIBUTORS @@ -0,0 +1,5 @@ +Grady O'Connell +Vance Palacio +David Briscoe +cuppajoeman +Christian Clauss diff --git a/MANIFEST.in b/MANIFEST.in index bf6a767..2c13c53 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ recursive-include textbeat/def * recursive-include textbeat/presets * +recursive-include textbeat/plugins * diff --git a/README.md b/README.md index 1ed5b51..b398a0c 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,9 @@ I'm currently looking into export options and recording via a headless host. # Tutorial If you're familiar with trackers, you may pick this up quite easily. -Music flows vertically, with separate columns that are separated by whitespace or + +First start by creating a .txbt (textbeat) file, inside the file music +flows vertically, with separate columns that are separated by whitespace or manually setting a column width. Each column represents a track, which defaults to separate midi channel numbers. @@ -127,6 +129,17 @@ Musicians can think of grid as fractions of quarter note, The grid is the beat/quarter-note subdivision. Both Tempo and Grid can be decimal numbers as well. +For example, if you made some chords and you only want +one chord to be played per bar (eg 4 beats) +you could set `%t120x0.25`. + +You can listen to what you've made by running: + +``` +textbeat +``` + +Consult the output of `textbeat -h` for further information. ## Note Numbers @@ -353,7 +366,7 @@ If you wish to control volume/gain directly, use @v Unlike accents, volume changes persist. -Interpolation is not yet impl +Interpolation is not yet implemented ## Vibrato, Pitch, and Mod Wheel @@ -526,8 +539,8 @@ Here are the marker/repeat commands: - :name| goes back to last marker 'name' - :N| goes back to last marker N number of times - :name*N| goes back to last marker 'name' N number of times -- || return/pop to last position after marker jump -- ||| end the song here +- ||| return/pop to last position after marker jump by-label +- |||| end the song here ``` ## Command line parameters (use -): @@ -754,4 +767,3 @@ without doing a C++ rewrite. # Can I Help? Yes! Contact [flipcoder](https://github.com/flipcoder). - diff --git a/debug/debug.py b/debug/debug.py new file mode 100755 index 0000000..29f36f6 --- /dev/null +++ b/debug/debug.py @@ -0,0 +1,10 @@ +#!/usr/bin/python3 +import sys +from mido import MidiFile + +mid = MidiFile(sys.argv[1]) +for i, track in enumerate(mid.tracks): + print("Track", str(i)) + for msg in track: + print(msg) + diff --git a/requirements.txt b/requirements.txt index aa981a4..5f5594c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ pyyaml docopt future shutilwhich +mido diff --git a/test/alternate_repeats.txbt b/test/alternate_repeats.txbt new file mode 100644 index 0000000..b57b763 --- /dev/null +++ b/test/alternate_repeats.txbt @@ -0,0 +1,12 @@ +%t80 +; make sure there's something in the callstack +|e: +1' +:e| +2' +|start: +1 +|| +2 +:start| +3 diff --git a/test/markers.txbt b/test/markers.txbt index 4091703..9538d8d 100644 --- a/test/markers.txbt +++ b/test/markers.txbt @@ -1,9 +1,5 @@ ; marker test ; should play 1 2 1 2 3 4 4 5 6 6 6 7 1' (1') 2' 2' 2' -;:| -;|: -;:|: -;||| 1 2 :| @@ -15,7 +11,7 @@ :|x: 2' :x*2| -||| +|||| |a: 4 || @@ -23,4 +19,4 @@ 6 :2| 7 -|| +||| diff --git a/test/modes.txbt b/test/modes.txbt index df6d436..3ed90e0 100644 --- a/test/modes.txbt +++ b/test/modes.txbt @@ -1,3 +1,4 @@ +;ionian %s1 1 2 @@ -6,6 +7,7 @@ 5 6 7 +;dorian %s2 1 2 @@ -14,6 +16,7 @@ 5 6 7 +;relative shift %k2 1 2 diff --git a/test/record.txbt b/test/record.txbt new file mode 100644 index 0000000..649ca9f --- /dev/null +++ b/test/record.txbt @@ -0,0 +1,7 @@ +1^^ +2 +3^^! +^ + + +^! diff --git a/test/tabs.txbt b/test/tabs.txbt index 7b2247c..6b98486 100644 --- a/test/tabs.txbt +++ b/test/tabs.txbt @@ -1,6 +1,6 @@ ; tab syntax ; not yet impl -||| +|||| %c20,-2 |0 | diff --git a/textbeat/__main__.py b/textbeat/__main__.py index 0afbdb7..0c9d096 100755 --- a/textbeat/__main__.py +++ b/textbeat/__main__.py @@ -1,6 +1,6 @@ #!/usr/bin/python """textbeat -Copyright (c) 2018 Grady O'Connell +Copyright (c) 2018-2021 Grady O'Connell Open-source under MIT License Examples: @@ -29,7 +29,6 @@ -c execute commands sequentially -l execute commands simultaenously --stdin read entire file from stdin - -r --remote (STUB) realtime remote (control through stdin/out) --ring don't mute midi on end -L --loop loop song --midi= generate midi file @@ -47,9 +46,11 @@ --quiet no output -a --analyze (STUB) midi input chord analyzer """ +# -r --remote (STUB) realtime remote (control through stdin/out) from __future__ import absolute_import, unicode_literals, print_function, generators # try: from .defs import * +import asyncio # except: # from .defs import * def main(): @@ -105,7 +106,7 @@ def main(): player.tracks[i].patch(val) elif arg == '--sustain': player.sustain=True elif arg == '--ring': player.ring=True - elif arg == '--remote': player.remote = True + # elif arg == '--remote': player.remote = True elif arg == '--lint': LINT = True elif arg == '--quiet': set_print(False) elif arg == '--follow': @@ -174,7 +175,8 @@ def main(): player.cmdmode = '' player.shell = True - player.interactive = player.shell or player.remote or player.tutorial + player.interactive = player.shell or player.tutorial + # player.interactive = player.shell or player.remote or player.tutorial pygame.midi.init() if pygame.midi.get_count()==0: @@ -266,14 +268,14 @@ def main(): if player.shell: log(FG.BLUE + 'textbeat')# v'+str(VERSION)) - log('Copyright (c) 2018 Grady O\'Connell') + log('Copyright (c) 2021 Grady O\'Connell') log('https://github.com/flipcoder/textbeat') active = support.SUPPORT_ALL & support.SUPPORT inactive = support.SUPPORT_ALL - support.SUPPORT - if active: - log(FG.GREEN + 'Active Modules: ' + STYLE.RESET_ALL + ', '.join(active) + STYLE.RESET_ALL) - if inactive: - log(FG.RED + 'Inactive Modules: ' + STYLE.RESET_ALL + ', '.join(inactive)) + # if active: + # log(FG.GREEN + 'Active Modules: ' + STYLE.RESET_ALL + ', '.join(active) + STYLE.RESET_ALL) + # if inactive: + # log(FG.RED + 'Inactive Modules: ' + STYLE.RESET_ALL + ', '.join(inactive)) if player.portname: log(FG.GREEN + 'Device: ' + STYLE.RESET_ALL + '%s' % (player.portname if player.portname else 'Unknown',)) log(FG.RED + 'Other Devices: ' + STYLE.RESET_ALL + '%s' % (', '.join(portnames))) @@ -287,7 +289,7 @@ def main(): # log(FG.BLUE + 'New? Type help and press enter to start the tutorial.') log('') - player.run() + asyncio.run(player.run()) if player.midifile: player.midifile.save(midifn) diff --git a/textbeat/defs.py b/textbeat/defs.py index b6be773..079569b 100644 --- a/textbeat/defs.py +++ b/textbeat/defs.py @@ -38,7 +38,7 @@ RANGE = 109 OCTAVE_BASE = 5 DRUM_WORDS = ['drum','drums','drumset','drumkit','percussion'] -CCHAR = ' <>=~.\'`,_&|!?*\"$(){}[]%@;' +CCHAR = ' <>=~.\'`,_&|!?*\"$(){}[]%@;^' CCHAR_START = 'TV' # control chars PRINT = True @@ -160,7 +160,7 @@ class Diff: def merge(a, b, overwrite=False, skip=None, diff=None, pth=None): for k,v in iteritems(b): contains = k in a - if contains and isinstance(a[k], dict) and isinstance(b[k], collections.Mapping): + if contains and isinstance(a[k], dict) and isinstance(b[k], collections.abc.Mapping): loc = (pth+[k]) if pth else None if callable(skip): if not skip(loc,v): diff --git a/textbeat/player.py b/textbeat/player.py index 36d67f5..77d2a66 100644 --- a/textbeat/player.py +++ b/textbeat/player.py @@ -1,23 +1,51 @@ -# TODO: This player modulde includes parser and player prototype code -# that may eventually be reorganized into separate modules - from .defs import * +from collections import deque +from prompt_toolkit import PromptSession +from enum import Enum + +STACK_SIZE = 64 + +class LoopResult(Enum): + """This enum types help us decide whether we should continue in the loop, break, or proceed. + This type is needed so we can choose how to deal interact with a loop outside a functions scope. + """ + PROCEED = 0 # Continue forward in the current loop + CONTINUE = 1 # Skip to the next iteration + BREAK = 2 # Break from the loop + class StackFrame(object): - def __init__(self, row, caller, count): + """ + Stack frames are items on the Player's call stack. + Similar to function calls, the Player uses a stack to track markers/repeats. + """ + def __init__(self, name, row, caller, count): + self.name = name self.row = row self.caller = caller self.count = count # repeat call counter self.markers = {} # marker name -> line self.returns = {} # repeat row -> number of rpts left # self.returns[row] = 0 + def __str__(self): + return 'StackFrame' + str((self.name, self.row, self.caller, self.count, self.markers, self.returns)) class Player(object): + """ + The Player is what parses the txbt format and plays it. + Beware! This is the most "quick and dirty" part of textbeat, so it will probably + be rewritten eventually. + """ class Flag: - ROMAN = bit(0) - TRANSPOSE = bit(1) - LOOP = bit(2) + """ + Bitflag options for Roman numeral notation, transposition, and looping + """ + ROMAN = bit(0) # use roman numeral notation + TRANSPOSE = bit(1) # allow transposition of note letters + LOOP = bit(2) # loop the textbeat file (good for jamtrack, metronome) + + # string names for the bitflags FLAGS = [ 'roman', 'transpose', @@ -30,65 +58,77 @@ class Flag: # ]) def __init__(self): - self.quitflag = False - self.vimode = False - self.bcproc = None - self.log = False + self.quitflag = False # Set this bool to escape to stop the parser + self.vimode = False # use vi readline mode for prompt_toolkit + # self.log = False + # 'canfollow' means cursor location is printed for interop with vim and others self.canfollow = False + self.last_follow = 0 # the last line 'followed' (printed) + # sleeping can be disabled for writing to midi files instead of playback self.cansleep = True - self.lint = False - self.tracks_active = 1 + self.lint = False # analyze file (not yet implemented) + self.tracks_active = 1 # the current number of known active tracks found by Player self.showmidi = False - self.scale = DIATONIC - self.mode = 1 - self.transpose = 0 - self.octave = 0 - self.tempo = 90.0 + self.scale = DIATONIC # default scale, see other options in theory.py + self.mode = 1 # musical mode number, 1=ionian + self.transpose = 0 # current transposion of entire song (in steps) + self.octave = 0 # relative octave transposition of entire song + self.tempo = 90.0 # default tempo when unspecified (may be different inside shell) self.grid = 4.0 # Grid subdivisions of a beat (4 = sixteenth note) + # columns/shift is hardcoded column placement and widths for + # when working in text editors with column highlighting self.columns = 0 self.column_shift = 0 - self.showtextstr = [] + # self.showtextstr = [] self.showtext = False # nice output (-v), only shell and cmd modes by default - self.sustain = False # start sustained - self.ring = False # disables midi muting on program exit + self.sustain = False # start sustained? + self.ring = False # ring: disables midi muting on program exit, letting notes ring out self.buf = [] - self.markers = {} - f = StackFrame(-1,-1,0) + self.markers = {} # markers are for doing jumps and repeats + f = StackFrame('',-1,-1,0) f.returns[''] = 0 - self.tutorial = None - self.callstack = [f] - self.separators = [] - self.track_history = ['.'] * NUM_TRACKS - self.fn = None - self.row = 0 + self.tutorial = None # Tutorial object to run (should be run inside shell, see -T) + # self.callstack = [f] # callstack for moving around the file using markers/repeats + self.callstack = deque(maxlen=STACK_SIZE) # callstack for moving around the file using markers/repeats + self.callstack.append(f) + # self.separators = [] # separators are currently not supported + self.track_history = ['.'] * NUM_TRACKS # keep track of track history for replaying with " symbol + self.fn = None # filename + self.row = 0 # parser location, row # # self.rowno = [] - self.startrow = -1 - self.stoprow = -1 + self.startrow = -1 # last row processed, def -1 + self.stoprow = -1 # row to stop on, if playing a specific region (-1 is entire file) self.cmdmode = 'n' # n normal c command s sequence - self.schedule = Schedule(self) - self.host = [] + self.plugins = [] # (under dev) textbeat interop plugins self.tracks = [] self.shell = False - self.remote = False + # eventually, text editor interop may be done by controlling txbt through a socket + # instead of column following + # self.remote = None self.interactive = False self.gui = False self.portname = '' - self.speed = 1.0 - self.muted = False # mute all except for solo tracks + self.speed = 1.0 # speed multiplier (this one is per file) + self.muted = False # mute all except for "solo" tracks self.midi = [] - self.instrument = None - self.t = 0.0 # actual time - self.last_follow = 0 - self.last_marker = -1 - self.midifile = None - self.flags = 0 + # self.instrument = None + self.t = 0.0 # time since file processing started, in sec + self.last_follow = -1 # last follow location (row #) + self.last_marker = -1 # last marker location (row #) + self.midifile = None # midi file to write output to + self.flags = 0 # see FLAGS (bitflags) self.version = '0' - self.auto = False + self.auto = False # (under dev) automatically generate VST rack using a plugin + # embedded config files (key=filename), for things like synth/plugin customization self.embedded_files = {} + self.vibrato_tracks = set() # tracks that currently have vibrato going - # require enable at top of file + # other devices require enabling them at the top of the file self.devices = ['midi'] + # (under dev) schedule will eventually decouple the player and parser + self.schedule = Schedule(self) + def init(self): for i in range(len(sys.argv)): @@ -155,14 +195,14 @@ def refresh_devices(self): assert False try: # support_enable[dev](self.rack) - SUPPORT_PLUGINS[dev].enable(self.host) + SUPPORT_PLUGINS[dev].enable(self.plugins) except KeyError: # no init needed, silent pass self.auto = 'auto' in self.devices - def set_host(self, plugins): - self.host = plugins + def set_plugins(self, plugins): + self.plugins = plugins self.refresh_devices() # def remove_flags(self, f): @@ -221,16 +261,16 @@ def pause(self): return False return True - def write_midi_tempo(self): + def write_midi_tempo(self, tempo=None): # set initial midifile tempo if self.midifile: if not self.midifile.tracks: self.midifile.tracks.append(mido.MidiTrack()) self.midifile.tracks[0].append(mido.MetaMessage( - 'set_tempo', tempo=mido.bpm2tempo(self.tempo) + 'set_tempo', tempo=mido.bpm2tempo(tempo or self.tempo) )) - - def run(self): + + async def run(self): for ch in self.tracks: ch.refresh() @@ -238,82 +278,18 @@ def run(self): embedded_file = False self.write_midi_tempo() + + prompt_session = PromptSession(history=HISTORY, vi_mode=self.vimode) while not self.quitflag: self.follow() try: self.line = '.' - try: - self.line = self.buf[self.row] - if self.row == self.startrow: - self.startrow = -1 - if self.stoprow!=-1 and self.row == self.stoprow: - self.buf = [] - raise IndexError - except IndexError: - if self.has_flags(Player.Flag.LOOP): - self.row = 0 - continue - - self.row = len(self.buf) - # done with file, finish playing some stuff - - arps_remaining = 0 - if self.interactive or self.cmdmode in ['c','l']: # finish arps in shell mode - for ch in self.tracks[:self.tracks_active]: - if ch.arp_enabled: - if ch.arp_cycle_limit or not ch.arp_once: - arps_remaining += 1 - self.line = '.' - if not arps_remaining and not self.shell and self.cmdmode not in ['c','l']: - break - self.line = '.' - - if not arps_remaining and not self.schedule.pending(): - if self.interactive: - for ch in self.tracks[:self.tracks_active]: - ch.release_all() - - if self.shell: - # self.shell PROMPT - # log(orr(self.tracks[0].scale,self.scale).mode_name(orr(self.tracks[0].mode,self.mode))) - # cur_oct = self.tracks[0].octave - # cline = FG.GREEN + 'txbt> '+FG.BLUE+ '('+str(int(self.tempo))+'bpm x'+str(int(self.grid))+' '+\ - # note_name(self.tracks[0].transpose) + ' ' +\ - # orr(self.tracks[0].scale,self.scale).mode_name(orr(self.tracks[0].mode,self.mode,-1))+\ - # ')> ' - modename = orr(self.tracks[0].scale,self.scale).mode_name(orr(self.tracks[0].mode,self.mode,-1)) - - keynote = note_name(self.transpose + self.tracks[0].transpose) - keynote = keynote if keynote!='C' else '' - parts = [ - str(int(self.tempo))+'bpm', # tempo - 'x'+str(int(self.grid)), # subdiv - keynote, - ('' if modename=='ionian' else modename) - ] - cline = 'txbt> ('+ \ - ' '.join(filter(lambda x: x, parts))+ \ - ')> ' - # if bufline.endswith('.txbt'): - # play file? - # bufline = raw_input(cline) - bufline = prompt(cline, history=HISTORY, vi_mode=self.vimode) - bufline = list(filter(None, bufline.split(' '))) - bufline = list(map(lambda b: b.replace(';',' '), bufline)) - elif self.remote: - pass - else: - assert False - - self.buf += bufline - - continue - - else: - break - + loop_result = await self.next_row(prompt_session) + if loop_result == LoopResult.CONTINUE: continue + elif loop_result == LoopResult.BREAK: break + log(FG.MAGENTA + self.line) # cells = line.split(' '*2) @@ -367,175 +343,10 @@ def run(self): # TODO: global 'silent' commands (doesn't consume time) if self.line.startswith('%'): - self.line = self.line[1:].strip() # remove % and spaces - for tok in self.line.split(' '): - if not tok: - break - if tok[0]==' ': - tok = tok[1:] - var = tok[0].upper() - if var in 'TGXNPSRCKFDR': # global vars % - cmd = tok.split(' ')[0] - op = cmd[1] - try: - val = cmd[2:] - except: - val = '' - # log("op val %s %s" % (op,val)) - if op == ':': op = '=' - if not op in '*/=-+': - # implicit = - val = str(op) + str(val) - op='=' - if not val or op=='.': - val = op + val # append - # TODO: add numbers after dots like other ops - if val[0]=='.': - note_value(val) - ct = count_seq(val) - val = pow(0.5,count) - op = '/' - num,ct = peel_uint(val[:ct]) - elif val[0]=='*': - op = '*' - val = pow(2.0,count_seq(val)) - if op=='/': - if var in 'GX': self.grid/=float(val) - elif var=='N': self.grid/=float(val) #! - elif var=='T': self.tempo/=float(val) - else: assert False - elif op=='*': - if var in 'GX': self.grid*=float(val) - elif var=='N': self.grid*=float(val) #! - elif var=='T': self.tempo*=float(val) - else: assert False - elif op=='+': - if var=='K': self.transpose += note_offset('#1' if val=='+' else val) - # elif var=='O': self.octave += int(1 if val=='+' else val) - elif var=='T': self.tempo += max(0,float(val)) - elif var in 'GX': self.grid += max(0,float(val)) - else: assert False - # if var=='K': - # self.octave += -1*sgn(self.transpose)*(self.transpose//12) - # self.transpose = self.transpose%12 - elif op=='-': - if var=='K': - self.transpose -= note_offset(val) - out(note_offset(val)) - # elif var=='O': self.octave -= int(1 if val=='-' else val) - elif var=='T': self.tempo -= max(0,float(val)) - elif var in 'GX': self.grid -= max(0,float(val)) - else: assert False - # self.octave += -1*sgn(self.transpose)*(self.transpose//12) - # if var=='K': - # self.octave += -1*sgn(self.transpose)*(self.transpose//12) - # self.transpose = self.transpose%12 - elif op=='=': - if var in 'GX': self.grid=float(val) - elif var=='R': - if not 'auto' in self.devices: - self.devices = ['auto'] + self.devices - self.set_host(val.split(',')) - elif var=='V': self.version = val - elif var=='D': - self.devices = val.split(',') - self.refresh_devices() - # elif var=='O': self.octave = int(val) - elif var=='N': self.grid=float(val)/4.0 #! - elif var=='T': - vals = val.split('x') - self.tempo=float(vals[0]) - try: - self.grid = float(vals[1]) - except: - pass - elif var=='C': - vals = val.split(',') - self.columns = int(vals[0]) - try: - self.column_shift = int(vals[1]) - except: - pass - elif var=='P': - vals = val.split(',') - for i in range(len(vals)): - p = vals[i] - if p.strip().isdigit(): - self.tracks[i].patch(int(p)) - else: - self.tracks[i].patch(p) - elif var=='F': # flags - self.add_flags(val.split(',')) - # for i in range(len(vals)): # TODO: ? - # self.tracks[i].add_flags(val.split(',')) - # elif var=='O': - # self.octave = int(val) - elif var=='K': - self.transpose = note_offset(val) - # self.octave += -1*sgn(self.transpose)*(self.transpose//12) - # self.transpose = self.transpose%12 - elif var=='S': - # var R=relative usage deprecated - try: - if val: - val = val.lower() - # ambigous alts - - if val.isdigit(): - modescale = (self.scale.name,int(val)) - else: - alts = {'major':'ionian','minor':'aeolian'} - # try: - # modescale = (alts[val[0],val[1]) - # except KeyError: - # pass - val = val.lower().replace(' ','') - - try: - modescale = MODES[val] - except KeyError: - raise NoSuchScale() - - try: - self.scale = SCALES[modescale[0]] - self.mode = modescale[1] - inter = self.scale.intervals - self.transpose = 0 - # log(self.mode-1) - - if var=='R': - for i in range(self.mode-1): - inc = 0 - try: - inc = int(inter[i]) - except ValueError: - pass - self.transpose += inc - elif var=='S': - pass - except ValueError: - raise NoSuchScale() - # else: - # self.transpose = 0 - - except NoSuchScale: - out(FG.RED + 'No such scale.') - pass - else: assert False # no such var - else: assert False # no such op - - if var=='T': - if self.midifile: - if not self.midifile.tracks: - self.midifile.tracks.append(mido.MidiTrack()) - self.midifile.tracks[0].append(mido.MetaMessage( - 'set_tempo', tempo=mido.bpm2tempo(int( - val.split('x')[0] - )) - )) - self.row += 1 - continue - + loop_result = self.handle_set_variable_commands() + if loop_result == LoopResult.CONTINUE: continue + elif loop_result == LoopResult.BREAK: break + # set marker here if (self.line[0]=='|' or self.line.startswith(':|')) and self.line[-1]==':': # allow override of markers in case of reuse @@ -554,10 +365,20 @@ def run(self): continue # marker AND repeat, continue to repeat parser section - if self.line.startswith('|||'): + if self.line.startswith('||||'): self.quitflag = True continue - elif self.line.startswith('||'): + elif self.line.startswith('|||'): + p = None + while True: + assert self.callstack + p = self.callstack.pop() + if p.name != '': + break + self.row = p.caller + 1 + continue + elif self.line.startswith('||'): # return/pop + # print(self.callstack) if len(self.callstack)>1: frame = self.callstack[-1] frame.count = max(0,frame.count-1) @@ -565,13 +386,17 @@ def run(self): self.row = frame.row + 1 continue else: + # print('returning to caller', frame.caller) self.row = frame.caller + 1 - self.callstack = self.callstack[:-1] + # self.callstack = self.callstack[:-1] + self.callstack.pop() continue else: - self.quitflag = True + # allow bypassing '||' on empty stack + # self.quitflag = True + self.row += 1 continue - if self.line[0]==':' and self.line[-1] in '|:' and '|' in self.line: + elif self.line[0]==':' and self.line[-1] in '|:' and '|' in self.line: jumpline = self.line[1:self.line.index('|')] frame = self.callstack[-1] jumpline = jumpline.split('*') # *n = n repeats @@ -601,28 +426,38 @@ def run(self): # if bmrow in frame.returns: # return to marker (no pushing) - # self.callstack.append(StackFrame(bmrow, self.row, count)) + # self.callstack.append(StackFrame(bm, bmrow, self.row, count)) # self.markers[jumpline[0]] = bmrow # self.row = bmrow + 1 # self.last_marker = bmrow + # print('bm', bm) + # print('fm', frame.markers) + # print('fm', frame.returns) if bmrow==self.last_marker or bm in frame.markers: # call w/o push? + # # if bm in frame.markers: # ctx already passed bookmark, call w/o pushing (return to mark) + # print(frame.returns, self.row) if self.row in frame.returns: # have we repeated yet? + # 2nd repeat (count exists) rpt = frame.returns[self.row] if rpt>0: frame.returns[self.row] = rpt - 1 self.row = bmrow + 1 # repeat else: + # repeating done + # frame.returns[self.row] = rpt - 1 del frame.returns[self.row] # reset self.row += 1 else: # start return count - frame.returns[self.row] = count - 1 + self.callstack.append(StackFrame(bm, bmrow, self.row, count)) + self.callstack[-1].returns[self.row] = count - 1 self.row = bmrow + 1 # repeat else: # mark not yet passed, do push/pop - self.callstack.append(StackFrame(bmrow, self.row, count)) + # print(bm) + self.callstack.append(StackFrame(bm, bmrow, self.row, count)) self.markers[bm] = bmrow self.row = bmrow + 1 self.last_marker = bmrow @@ -635,7 +470,7 @@ def run(self): # else: # self.row += 1 # else: - # self.callstack.append(StackFrame(bmrow, self.row, count)) + # self.callstack.append(StackFrame(bm, bmrow, self.row, count)) # self.markers[jumpline[0]] = bmrow # self.row = bmrow + 1 # self.last_marker = bmrow @@ -654,14 +489,15 @@ def run(self): # separate into chunks based on column width cells = [cells[i:i + self.columns] for i in range(0, len(cells), self.columns)] # log(cells) - elif not self.separators: + else: + # elif not self.separators: # AUTOGENERATE CELL self.separators cells = fullline.split(' ') pos = 0 for cell in cells: - if cell: - if pos: - self.separators.append(pos) + # if cell: + # if pos: + # self.separators.append(pos) # log(cell) pos += len(cell) + 1 # log( "self.separators " + str(self.separators)) @@ -669,17 +505,19 @@ def run(self): # if fullline.startswith(' '): # cells = ['.'] + cells # dont filter first one autoseparate = True - else: - # SPLIT BASED ON self.separators - s = 0 - seplen = len(self.separators) - # log(seplen) - pos = 0 - for i in range(seplen): - cells.append(fullline[pos:self.separators[i]].strip()) - pos = self.separators[i] - lastcell = fullline[pos:].strip() - if lastcell: cells.append(lastcell) + # else: + # log('Track separators are no longer supported.') + # assert False + # # SPLIT BASED ON self.separators + # s = 0 + # seplen = len(self.separators) + # # log(seplen) + # pos = 0 + # for i in range(seplen): + # cells.append(fullline[pos:self.separators[i]].strip()) + # pos = self.separators[i] + # lastcell = fullline[pos:].strip() + # if lastcell: cells.append(lastcell) # make sure active tracks get empty cell len_cells = len(cells) @@ -1450,13 +1288,15 @@ def run(self): # vel = int((float(num) / float('9'*len(num)))*127) # ch.cc(7,vel) # RECORD SEQ - elif cell.startswith('^^'): + # elif cell.startswith('^^'): + elif c2=='^^': cell = cell[2:] r,ct = peel_uint(cell,0) ch.record(r) cell = cell[ct:] # REPLAY SEQ - elif cell.startswith('^'): + elif c=='^': + # elif cell.startswith('^'): cell = cell[1:] r,ct = peel_uint(cell,0) if self.showtext: @@ -1831,7 +1671,7 @@ def run(self): try: # don't delay on ctrl lines or file header if not ctrl and not self.header: - self.schedule.logic(60.0 / self.tempo / self.grid) + await self.schedule.logic(60.0 / self.tempo / self.grid) break else: break @@ -1868,3 +1708,278 @@ def run(self): self.row += 1 + async def next_row(self, prompt_session:PromptSession) -> LoopResult: + try: + self.line = self.buf[self.row] + if self.row == self.startrow: + self.startrow = -1 + if self.stoprow!=-1 and self.row == self.stoprow: + self.buf = [] + raise IndexError + return LoopResult.PROCEED + except IndexError: + if self.has_flags(Player.Flag.LOOP): + self.row = 0 + return LoopResult.CONTINUE + + self.row = len(self.buf) + # done with file, finish playing some stuff + + arps_remaining = 0 + if self.interactive or self.cmdmode in ['c','l']: # finish arps in shell mode + for ch in self.tracks[:self.tracks_active]: + if ch.arp_enabled: + if ch.arp_cycle_limit or not ch.arp_once: + arps_remaining += 1 + self.line = '.' + if not arps_remaining and not self.shell and self.cmdmode not in ['c','l']: + return LoopResult.BREAK + self.line = '.' + + if not arps_remaining and not self.schedule.pending(): + if self.interactive: + for ch in self.tracks[:self.tracks_active]: + ch.release_all() + + if self.shell: + self.buf += await self.mk_prompt(prompt_session) + # elif self.remote: + # pass # not yet implemented + else: + assert False + return LoopResult.CONTINUE + + else: + return LoopResult.BREAK + return LoopResult.PROCEED + + async def mk_prompt(self, prompt_session:PromptSession): + """Creates a new prompt for the current line""" + # self.shell PROMPT + # log(orr(self.tracks[0].scale,self.scale).mode_name(orr(self.tracks[0].mode,self.mode))) + # cur_oct = self.tracks[0].octave + # cline = FG.GREEN + 'txbt> '+FG.BLUE+ '('+str(int(self.tempo))+'bpm x'+str(int(self.grid))+' '+\ + # note_name(self.tracks[0].transpose) + ' ' +\ + # orr(self.tracks[0].scale,self.scale).mode_name(orr(self.tracks[0].mode,self.mode,-1))+\ + # ')> ' + modename = orr(self.tracks[0].scale,self.scale).mode_name(orr(self.tracks[0].mode,self.mode,-1)) + + keynote = note_name(self.transpose + self.tracks[0].transpose) + keynote = keynote if keynote!='C' else '' + parts = [ + str(int(self.tempo))+'bpm', # tempo + 'x'+str(int(self.grid)), # subdiv + keynote, + ('' if modename=='ionian' else modename) + ] + cline = 'txbt> ('+ \ + ' '.join(filter(lambda x: x, parts))+ \ + ')> ' + # if bufline.endswith('.txbt'): + # play file? + # bufline = raw_input(cline) + bufline = await prompt_session.prompt_async(cline) + bufline = list(filter(None, bufline.split(' '))) + bufline = list(map(lambda b: b.replace(';',' '), bufline)) + + return bufline + + def handle_set_variable_commands(self): + """Function used to parse/handle variable setting commands. E.g: Set tempo, grid""" + + #--------------------------# + # Sub-function definitions # + #--------------------------# + # We define sub-functions since we don't want the entire module scope to have access to + # these. If it turns out that these are useful in a broader context, we can just rip em out + # at that point + def read_operands(cmd) -> tuple[str,str]: + """Read the operator and value to use when modifying variables. + E.g: `+`, `-`, `=` + """ + op = cmd[1] + try: + val = cmd[2:] + except: + val = '' + # log("op val %s %s" % (op,val)) + if op == ':': op = '=' + if not op in '*/=-+': + # implicit = + val = str(op) + str(val) + op='=' + return (val,op) + + def handle_division(var, val): + """Use /= to modify a global variable""" + if var in 'GX': self.grid/=float(val) + elif var=='N': self.grid/=float(val) #! + elif var=='T': self.tempo/=float(val) + else: assert False + + def handle_multiply(var, val): + """Use *= to modify a global variable""" + if var in 'GX': self.grid*=float(val) + elif var=='N': self.grid*=float(val) #! + elif var=='T': self.tempo*=float(val) + else: assert False + + def handle_add(var, val): + """Use += to modify a global variable""" + if var=='K': self.transpose += note_offset('#1' if val=='+' else val) + # elif var=='O': self.octave += int(1 if val=='+' else val) + elif var=='T': self.tempo += max(0,float(val)) + elif var in 'GX': self.grid += max(0,float(val)) + else: assert False + # if var=='K': + # self.octave += -1*sgn(self.transpose)*(self.transpose//12) + # self.transpose = self.transpose%12 + + def handle_sub(var, val): + """Use -= to modify a global variable""" + if var=='K': + self.transpose -= note_offset(val) + out(note_offset(val)) + # elif var=='O': self.octave -= int(1 if val=='-' else val) + elif var=='T': self.tempo -= max(0,float(val)) + elif var in 'GX': self.grid -= max(0,float(val)) + else: assert False + # self.octave += -1*sgn(self.transpose)*(self.transpose//12) + # if var=='K': + # self.octave += -1*sgn(self.transpose)*(self.transpose//12) + # self.transpose = self.transpose%12 + + def handle_assign(var, val): + """Use = to modify a global variable""" + if var in 'GX': self.grid=float(val) + elif var=='R': + if not 'auto' in self.devices: + self.devices = ['auto'] + self.devices + self.set_plugins(val.split(',')) + elif var=='V': self.version = val + elif var=='D': + self.devices = val.split(',') + self.refresh_devices() + # elif var=='O': self.octave = int(val) + elif var=='N': self.grid=float(val)/4.0 #! + elif var=='T': + vals = val.split('x') + self.tempo=float(vals[0]) + try: + self.grid = float(vals[1]) + except: + pass + elif var=='C': + vals = val.split(',') + self.columns = int(vals[0]) + try: + self.column_shift = int(vals[1]) + except: + pass + elif var=='P': + vals = val.split(',') + for i in range(len(vals)): + p = vals[i] + if p.strip().isdigit(): + self.tracks[i].patch(int(p)) + else: + self.tracks[i].patch(p) + elif var=='F': # flags + self.add_flags(val.split(',')) + # for i in range(len(vals)): # TODO: ? + # self.tracks[i].add_flags(val.split(',')) + # elif var=='O': + # self.octave = int(val) + elif var=='K': + self.transpose = note_offset(val) + # self.octave += -1*sgn(self.transpose)*(self.transpose//12) + # self.transpose = self.transpose%12 + elif var=='S': + # var R=relative usage deprecated + try: + if val: + val = val.lower() + # ambigous alts + + if val.isdigit(): + modescale = (self.scale.name,int(val)) + else: + alts = {'major':'ionian','minor':'aeolian'} + # try: + # modescale = (alts[val[0],val[1]) + # except KeyError: + # pass + val = val.lower().replace(' ','') + + try: + modescale = MODES[val] + except KeyError: + raise NoSuchScale() + + try: + self.scale = SCALES[modescale[0]] + self.mode = modescale[1] + inter = self.scale.intervals + self.transpose = 0 + # log(self.mode-1) + + if var=='R': + for i in range(self.mode-1): + inc = 0 + try: + inc = int(inter[i]) + except ValueError: + pass + self.transpose += inc + elif var=='S': + pass + except ValueError: + raise NoSuchScale() + # else: + # self.transpose = 0 + + except NoSuchScale: + out(FG.RED + 'No such scale.') + pass + else: assert False # no such var + + def adjust_operands(val: str, op:str) -> tuple[str,str]: + val = op + val # append + # TODO: add numbers after dots like other ops + if val[0]=='.': + note_value(val) + ct = count_seq(val) + val = pow(0.5,count) + op = '/' + num,ct = peel_uint(val[:ct]) + elif val[0]=='*': + op = '*' + val = pow(2.0,count_seq(val)) + return (val,op) + + #------------------------# + # Function's Entry Point # + #------------------------# + self.line = self.line[1:].strip() # remove % and spaces + for tok in self.line.split(' '): + if not tok: + return LoopResult.BREAK + if tok[0]==' ': + tok = tok[1:] + var = tok[0].upper() + if var in 'TGXNPSRCKFDR': # global vars % + cmd = tok.split(' ')[0] + (val,op) = read_operands(cmd) + if not val or op=='.': (val,op) = adjust_operands(val,op) + + if op=='/': handle_division(var,val) + elif op=='*': handle_multiply(var,val) + elif op=='+': handle_add(var,val) + elif op=='-': handle_sub(var,val) + elif op=='=': handle_assign(var,val) + else: assert False # no such op + + if var=='T': + self.write_midi_tempo(int(val.split('x')[0])) + self.row += 1 + return LoopResult.CONTINUE diff --git a/textbeat/schedule.py b/textbeat/schedule.py index 32de82d..c2beac1 100644 --- a/textbeat/schedule.py +++ b/textbeat/schedule.py @@ -1,4 +1,6 @@ from .defs import * +import time +import asyncio class Event(object): def __init__(self, t, func, ch): @@ -16,6 +18,8 @@ def __init__(self, ctx): self.clock = 0.0 self.last_clock = 0 self.started = False + # self.sleepfunc = time.sleep + # self.sleep = asyncio.sleep # all note mute and play events should be marked skippable def pending(self): return len(self.events) @@ -27,17 +31,17 @@ def clear(self): def clear_channel(self, ch): assert False self.events = [ev for ev in self.events if ev.ch!=ch] - def logic(self, t): + async def logic(self, t): processed = 0 self.passed = 0 + # tt = time.perf_counter() # if self.last_clock == 0: - # self.last_clock = time.clock() - # clock = time.clock() + # self.last_clock = tt + # clock = tt # self.dontsleep = (clock - self.last_clock) # self.last_clock = clock - # clock = time.clock() # if self.started: # tdelta = (clock - self.passed) # self.passed += tdelta @@ -48,6 +52,9 @@ def logic(self, t): # self.passed = 0.0 # log(self.clock) + pending_events_count = sum(1 for e in self.events if e.t > 0.0 and e.t < 2.0) + # print(pending_events_count) + try: self.events = sorted(self.events, key=lambda e: e.t) for ev in self.events: @@ -58,7 +65,7 @@ def logic(self, t): if ev.t >= 0.0: if self.ctx.cansleep and self.ctx.startrow == -1: self.ctx.t += self.ctx.speed * t * (ev.t-self.passed) - time.sleep(max(0,self.ctx.speed * t * (ev.t-self.passed))) + await asyncio.sleep(max(0,self.ctx.speed * t * (ev.t-self.passed))) ev.func(0) self.passed = ev.t # only inc if positive else: @@ -70,7 +77,7 @@ def logic(self, t): if slp > 0.0: self.ctx.t += self.ctx.speed*slp if self.ctx.cansleep and self.ctx.startrow == -1: - time.sleep(max(0,self.ctx.speed*slp)) + await asyncio.sleep(max(0,self.ctx.speed*slp)) self.passed = 0.0 self.events = self.events[processed:] diff --git a/textbeat/track.py b/textbeat/track.py index 6b19f2f..0a61df1 100644 --- a/textbeat/track.py +++ b/textbeat/track.py @@ -2,8 +2,8 @@ import math class Recording(object): - def __init__(self, name, slot): - self.name = slot + def __init__(self, name): + self.name = name self.content = [] class Tuplet(object): @@ -90,12 +90,50 @@ def reset(self): # self.muted = False self.volval = 1.0 self.slots = {} # slot -> Recording + self.recording_slots = set() # slots currently recording self.slot = None # careful, this can be 0 - self_slot_idx = 0 + self.slot_idx = 0 self.lane = None self.lanes = [] self.ccs = {} self.dev = 0 + + # pitch wheel oscillation + self.vibrato_enabled = False # don't set this directly, call vibrato() + self.vibrato_amp = 0.0 + self.vibrato_freq = 0.0 + # self.vibrato_offset = 0.0 # value that gets added to pitch + + def record(self, label): + self.slots[label] = Recording(label) + self.recording_slots.add(label) + def record_stop(self, label): + self.recording_slots.remove(label) + def recording(self): + return self.recording_slots + def replay(self, label): + pass + def vibrato(self, b, amp=0.0, freq=0.0): + self.vibrato_amp = amp + self.vibrato_freq = freq + self.vibrato_t = 0.0 + if b == self.vibrato_enabled: + return + if b: + try: + self.ctx.vibrato_tracks.remove(self) + except KeyError: + pass + else: + self.ctx.vibrato_tracks.add(self) + self.pitch(self.pitchval) + self.vibrato_enabled = b + + def vibrato_logic(self, t): + # TODO: test this + self.vibrato_t += t + v = math.sin(self.vibrato_t * self.vibrato_freq * math.tau) * self.vibrato_amp + self.pitch(self.pitchval + v, False) # don't save new pitchval on call # def _lazychannelfunc(self): # # get active channel numbers @@ -263,27 +301,27 @@ def midi_channel(self, midich, stackidx=-1): self.channels = [(0,midich)] elif midich not in self.channels: self.channels.append(midich) - def pitch(self, val): # [-1.0,1.0] + def write_short(self, ch, status, val, val2): + if self.ctx.midifile: + self.midifile_write(ch,mido.UnknownMetaMessage(status,data=[val,val2], time=self.us())) + else: + self.midi[ch].write_short(status,val,val2) + def pitch(self, val, save=True): # [-1.0,1.0] + if save: + self.pitchval = val val = min(max(0,int((1.0 + val)*0x2000)),16384) - self.pitchval = val val2 = (val>>0x7f) val = val&0x7f for ch in self.channels: status = (MIDI_PITCH<<4) + ch[1] if self.ctx.showmidi: log(FG.YELLOW + 'MIDI: PITCH (%s, %s)' % (val,val2)) - if self.ctx.midifile: - self.midifile_write(ch[0],mido.UnknownMetaMessage(status,data=[val1,val2], time=self.us())) - else: - self.midi[ch[0]].write_short(status,val,val2) + self.write_short(ch[0], status, val, val2) def cc(self, cc, val): # control change if type(val) ==type(bool): val = 127 if val else 0 # allow cc bool switches for ch in self.channels: status = (MIDI_CC<<4) + ch[1] if self.ctx.showmidi: log(FG.YELLOW + 'MIDI: CC (%s, %s, %s)' % (status, cc,val)) - if self.ctx.midifile: - self.midifile_write(ch[0], mido.UnknownMetaMessage(status,data=[cc,val],time=self.us())) - else: - self.midi[ch[0]].write_short(status,cc,val) + self.write_short(ch[0], status, cc, val) self.ccs[cc] = v if cc==1: self.modval = val