Pumpkin-Cool-Tapping Instrument

Really Awesome
A Project By Just A Team !
Pujun Lun, Ruoxuan Xu, Xiaoxing Yan


Demonstration Video


Introduction


Music beat matching game apps are gaining popularity these days. Some apps like Tapsonic and BeatMania have implemented the function of synthesizing the music beat with moving colorful objects on screen, and interact with players’ touch during the game.

The methods these games use for visualize beats and notes for music are a combination of programs and manual refinement. That is because the beat detected automatically, in most cases, does not perfectly accord with human sense. That is why in our program we both generated the music map file manually and by programs. Some rhythm analysis libraries are available in python to make use of.



Generic placeholder image

Project Objective:

  • To create a rhythm-matching game based on Raspberry Pi and piTFT.
  • For a song played on Raspberry Pi, Pi will translate its rhythm to successive colored blocks appearing from upper edge of TFT, with a variety of sizes, which will slide down their relative trails and hit the bottom band of screen in correspondence to the synchronized beat of the music.
  • The game player taps electrical keys to Raspberry Pi at the same time with the hitting. When the tapping is detected in the right time, the bottom band will let the colored blocks pass through, or the bottom band would just obstruct that block for any part falling into the band, till the whole band is ‘eaten’.


Design and Testing


Our project mainly can be divided into two parts: hardware design and software design.
And the following are the steps of every part, along with the challenges we met.


Drawings



Generic placeholder image

    Pumpkin-Cool-Tapping Instrument


Generic placeholder image

   Homepage in piTFT



Result and Concusion



Work Distribution


Generic placeholder image

Project group picture: Just A Team !



Generic placeholder image

Xiaoxing Yan

xy363@cornell.edu

--Wrote the code of key-initialize for software design.
--Refined the map file by limiting shortest blocks size.
--Made and refined keys with Ruoxuan Xu.


Generic placeholder image

Ruoxuan Xu

rx65@cornell.edu

--Programmed to generate manually made map file.
-- Created and tested manually made map files.
-- Tested and refined electrical keys with Xiaoxing Yan.


Generic placeholder image

Pujun Lun

pl557@cornell.edu

--Wrote and tested the code of detecting beats,
reading and writing map files as well as visualizing the rhythm of the music


Parts List

Total: $19.99


References

Essentia
Sensor Film Kit
R-Pi GPIO Document

Code Appendix


display.py

--display.py is a wrapper class of Pygame.
It includes methods to create, refresh and clear a screen, render graphics and get the mouse click position.

                     
import pygame
import os


width, height = 320, 240

WHITE = 255, 255, 255
RED = 255, 0, 0
GREEN = 0, 255, 0
BLUE = 0, 0, 255
YELLOW = 255, 255, 0
PINK = 255, 192, 203
BLACK = 0, 0, 0


class Screen(object):
    def __init__(self, on_tft=False):
        if on_tft:
            os.putenv('SDL_FBDEV', '/dev/fb1')
            os.putenv('SDL_VIDEODRIVER', 'fbcon')
            os.putenv('SDL_MOUSEDRV', 'TSLIB')
            os.putenv('SDL_MOUSEDEV', '/dev/input/touchscreen')

        pygame.init()
        self.screen = pygame.display.set_mode((width, height))
        pygame.mouse.set_visible(not on_tft)
        self.clock = pygame.time.Clock()

    def clear(self):
        self.screen.fill(BLACK)

    def render_circle(self, center, radius, color):
        pygame.draw.circle(self.screen, color, center, radius, 0)

    def render_polygon(self, points, color, width=0):
        pygame.draw.polygon(self.screen, color, points, width)

    def render_text(self, text_pos, font, color):
        text_font = pygame.font.Font(None, font)
        for text, pos in text_pos.items():
            text_surface = text_font.render(text, True, color)
            self.screen.blit(text_surface, text_surface.get_rect(center=pos))

    def tick(self, frame_rate):
        self.clock.tick(frame_rate)

    @classmethod
    def display(cls):
        pygame.display.flip()

    @classmethod
    def get_click_pos(cls):
        return [pygame.mouse.get_pos() for event in pygame.event.get()
                if event.type is pygame.MOUSEBUTTONUP]



              

keyboard.py

-- keyboard.py is a wrapper class of RPi.GPIO.
It includes methods to initialize, clean up and read status of GPIO pins.

        
import RPi.GPIO as GPIO


def key_initiate(all_pin):
    GPIO.setmode(GPIO.BCM)
    [GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) for pin in all_pin]

       
def key_status(all_pin):
    return [not GPIO.input(pin) for pin in all_pin]


def key_clean():
    GPIO.cleanup()

              
              

manual_make.py

--manual_make.py is used for generating the map file manually.
It uses RPi.GPIO to read the status of buttons in frames, turn into an array and store into the map file.


import RPi.GPIO as GPIO
import keyboard
import pygame
from time import time, sleep
import numpy as np

dict_pin = {5: 8, 6: 4, 13: 2, 26: 1}
all_pin = [5, 6, 13, 26]
keyboard.key_initiate(all_pin)

pygame.mixer.init()
pygame.mixer.music.load("AWA.mp3")
pygame.mixer.music.play()

record = []
timestamp = time()
interval = 1.0 / 60
running = True
while running:
    try:
        current = 0
        for pin, number in dict_pin.items():
            if not GPIO.input(pin):
                current += number
        record.append(current)

        sleep_time = timestamp + interval - time()
        if sleep_time > 0:
            sleep(sleep_time)
        timestamp += interval

    except KeyboardInterrupt:
        running = False
        np.save("record", np.array(record))

GPIO.cleanup()

                 

visualize.py

--visualize.py is built on the top of display.py.
It includes classes Trapezoid, Note, Visualizer and MapReader. The first three are used to create the user interface of our application.
The last one, MapReader, is used to read manually made map files, and display using Visualizer.


import display
import keyboard
from time import time, sleep
import numpy as np
import subprocess


verify_height = 40
trapezoid_top_width = 10
top_anchor = range(display.width / 2 - trapezoid_top_width * 2,
                   display.width / 2 + trapezoid_top_width * 2 + 1,
                   trapezoid_top_width)
bottom_anchor = range(0, display.width + 1, display.width / 4)
select_pos = [(160, 30), (160, 90), (160, 150), (160, 210)]


class Trapezoid(object):
    def __init__(self, screen, top, bottom, index, left_slope, right_slope, color, speed):
        self.screen = screen
        self.top = top
        self.bottom = bottom
        self.index = index
        self.left_slope = left_slope
        self.right_slope = right_slope
        self.color = color
        self.speed = speed

    def _get_points(self):
        return [[(self.left_slope
                  and (self.top / self.left_slope + top_anchor[self.index],)
                  or (top_anchor[self.index],))[0], self.top],

                [(self.right_slope
                  and (self.top / self.right_slope + top_anchor[self.index + 1],)
                  or (top_anchor[self.index + 1],))[0], self.top],

                [(self.right_slope
                  and (self.bottom / self.right_slope + top_anchor[self.index + 1],)
                  or (top_anchor[self.index + 1],))[0], self.bottom],

                [(self.left_slope
                  and (self.bottom / self.left_slope + top_anchor[self.index],)
                  or (top_anchor[self.index],))[0], self.bottom]]

    def render(self, color=None):
        self.screen.render_polygon(self._get_points(), color and color or self.color)
        return self

    def move_down(self):
        self.top = min(display.height, self.top + self.speed)
        self.bottom = min(display.height - verify_height, self.bottom + self.speed)
        return self.top < display.height - verify_height


class Note(object):
    def __init__(self, screen, index, color, speed):
        self.color = color
        self.left_slope = (top_anchor[index] != bottom_anchor[index]
                           and (display.height / float(bottom_anchor[index] - top_anchor[index]),)
                           or (0,))[0]
        self.right_slope = (top_anchor[index + 1] != bottom_anchor[index + 1]
                            and (display.height / float(bottom_anchor[index + 1] - top_anchor[index + 1]),)
                            or (0,))[0]
        self.trapezoids = []
        self.verify = Trapezoid(screen, display.height - verify_height, display.height, index,
                                self.left_slope, self.right_slope, self.color, speed)

    def move_all_trapezoids(self, pressed):
        self.trapezoids = [trapezoid.render() for trapezoid in self.trapezoids if trapezoid.move_down()]
        self.verify.render(pressed and self.trapezoids and self.trapezoids[0].bottom >= display.height - verify_height
                           and self.color or display.PINK)


class Visualizer(object):
    def __init__(self, music_path="", fifo="", trapezoid_height=20, speed=2, on_tft=False, screen=None):
        self.screen = screen and screen or display.Screen(on_tft=on_tft)
        self.screen.clear()
        self.screen.render_text({"Loading...": (160, 120)}, 40, display.WHITE)
        self.screen.display()
        self.cross_screen = (display.height - verify_height) / speed
        self.notes = [Note(self.screen, 0, display.WHITE, speed),
                      Note(self.screen, 1, display.RED, speed),
                      Note(self.screen, 2, display.GREEN, speed),
                      Note(self.screen, 3, display.BLUE, speed)]
        self.trapezoid_height = trapezoid_height
        self.state = [0, 0, 0, 0]
        self.speed = speed
        if music_path and fifo:
            self.fifo = fifo
            subprocess.call(["mplayer -input file={} {} &".format(fifo, music_path)], shell=True)
            subprocess.check_output("echo 'pause' > {}".format(fifo), shell=True)

    def play_music(self):
        subprocess.check_output("echo 'pause' > {}".format(self.fifo), shell=True)

    def stop_music(self):
        subprocess.check_output("echo 'quit' > {}".format(self.fifo), shell=True)

    def map_file_refresh(self, frame, pressed):
        self.screen.clear()

        for i in range(4):
            if frame >> 3 - i & 0x01:
                if len(self.notes[i].trapezoids) and self.notes[i].trapezoids[-1].top == display.height:
                    self.notes[i].trapezoids[-1].top -= self.speed
                else:
                    self.notes[i].trapezoids.append(Trapezoid(self.screen, -self.speed, 0, i,
                                                              self.notes[i].left_slope,
                                                              self.notes[i].right_slope,
                                                              self.notes[i].color, self.speed))
            self.notes[i].move_all_trapezoids(pressed[i])

        self.screen.display()

        for pos in self.screen.get_click_pos():
            print pos
            return False

        return True

    def detection_refresh(self, frame, pressed):
        self.screen.clear()

        new_state = [0] * 4
        for i in range(4):
            if frame[i]:
                # set state value to designated height for countdown
                new_state[i] = self.trapezoid_height

                if self.state[i]:
                    self.notes[i].trapezoids[-1].top -= self.speed
                else:
                    self.notes[i].trapezoids.append(Trapezoid(self.screen, -self.speed, 0, i,
                                                              self.notes[i].left_slope,
                                                              self.notes[i].right_slope,
                                                              self.notes[i].color, self.speed))
            elif self.state[i]:
                self.notes[i].trapezoids[-1].top -= self.speed
                new_state[i] = self.state[i] - 1

            self.notes[i].move_all_trapezoids(pressed[i])

        self.state = new_state
        self.screen.display()

        for pos in self.screen.get_click_pos():
            print pos
            return True

        return False


class MapReader(object):
    def __init__(self, music_path, map_path, pin_list, frame_rate, speed=2, on_tft=False, screen=None):
        self.record = np.load(map_path)
        self.pin_list = pin_list
        keyboard.key_initiate(pin_list)
        self.interval = 1.0 / frame_rate
        self.visualizer = Visualizer(music_path, "mplayer_fifo", 0, speed, on_tft, screen)

    def __call__(self):
        timestamp = time()
        display_start = time()
        counter = 0

        try:
            for idx in range(len(self.record)):
                counter += 1
                if counter == self.visualizer.cross_screen:
                    self.visualizer.play_music()

                if not self.visualizer.map_file_refresh(self.record[idx], keyboard.key_status(self.pin_list)):
                    break

                sleep_time = timestamp + self.interval - time()
                if sleep_time > 0:
                    sleep(sleep_time)
                timestamp += self.interval

        except KeyboardInterrupt:
            pass

        finally:
            self.visualizer.stop_music()
            keyboard.key_clean()
            print("Elapsed time: {:.2f}s".format(time() - display_start))


if __name__ == "__main__":
    map_reader = MapReader("peace.mp3", "peace&love.npy", [26, 13, 6, 5], 60)
    map_reader()

                 

analyze.py

--analyze.py is built on the top of visualize.py.
It includes class Analyzer, which is used to read the map file that stores beats, and display them using Visualizer. If no map file is found, it will import the Essentia library to do beat detection, store the result into a map file, and then display beats.
Class Visualizer and Analyzer are designed to be very easy to use. The instances of them are callable after initialization.
All functions, including the time control, playing music and refreshing the screen, will be executed at the designated frame rate until the screen is touched.
The usages of them are shown after “if __name__ == "__main__":” in both files.


import os
import multiprocessing
import numpy as np
import visualize
import keyboard
from time import time, sleep


# https://stackoverflow.com/a/46266853/7873124
# put declaration outside of the Analyzer class
# multiprocess cannot pickle an undefined function
def detect_onset(audio, index):
    # should be able to fetch the module from cache
    import essentia.standard as ess_std
    from essentia import array

    print("Subprocess {} starts".format(index))
    processing_start = time()

    onset_detector = ess_std.OnsetDetection(method="complex")
    window = ess_std.Windowing(type="hann")
    fft = ess_std.FFT()
    c2p = ess_std.CartesianToPolar()
    onsets = ess_std.Onsets()

    frames = []
    for frame in ess_std.FrameGenerator(audio, frameSize=1024, hopSize=512):
        mag, phase = c2p(fft(window(frame)))
        frames.append(onset_detector(mag, phase))

    onsets_array = onsets(array([frames]), [1])
    print("Subprocess {} finished. Elapsed time: {:.2}s".format(index, time() - processing_start))
    return onsets_array


class Analyzer(object):
    def __init__(self, music_path, sample_rate, frame_rate, least_energy, pin_list, response_time,
                 speed=2, on_tft=False, screen=None):
        keyboard.key_initiate(pin_list)
        self.pin_list = pin_list
        self.frame_rate = frame_rate
        self.time_interval = 1.0 / frame_rate
        self.least_energy = least_energy
        self.sample_rate = sample_rate
        self.visualizer = visualize.Visualizer(music_path, "mplayer_fifo",
                                               int(speed * frame_rate * response_time), speed, on_tft, screen)

        stats_file = "{}.npz".format(os.path.splitext(music_path)[0])
        # if the music has not been processed before
        if not os.path.isfile(stats_file):
            print("Loading Essentia module...")
            import essentia.standard as ess_std

            print("Loading music...")
            loader = ess_std.MonoLoader(filename=music_path)
            audio = loader()

            pool = multiprocessing.Pool()
            num_process = multiprocessing.cpu_count()
            segment_length = int(len(audio) / num_process) / 1024 * 1024

            onset_collector = []
            self.num_frames = len(audio)

            print("Calculating onsets...")
            processing_start = time()

            results = [None] * num_process
            for i in range(num_process):
                results[i] = pool.apply_async(detect_onset, args=(
                    audio[segment_length * i: min(segment_length * (i + 1), len(audio))], i))
            pool.close()
            pool.join()

            for i in range(num_process):
                onsets = results[i].get() + i * float(segment_length) / self.sample_rate
                onset_collector += onsets.tolist()

            onset_collector.append(np.finfo(float).max)  # so that read_onset will never reach len(self.onsets)
            self.onsets = np.array(onset_collector)

            print("Onset detection finished. Elapsed time: {:.2f}s".format(time() - processing_start))
            np.savez(os.path.splitext(music_path)[0], num_frames=np.array([self.num_frames]), onsets=self.onsets)

        else:
            stats = np.load(stats_file)
            self.num_frames = stats["num_frames"][0]
            self.onsets = stats["onsets"]
            print("Pre-processing skipped")

    def __call__(self):
        timestamp = time()
        display_start = time()
        read_onset = 0
        counter = 0

        try:
            for i in xrange(int(self.num_frames / float(self.sample_rate) * self.frame_rate)):
                counter += 1
                if counter == self.visualizer.cross_screen:
                    self.visualizer.play_music()

                frame = [0] * 4
                if time() - display_start > self.onsets[read_onset]:
                    read_onset += 1
                    frame[np.random.randint(0, 4)] = 1
                clicked = self.visualizer.detection_refresh(frame, keyboard.key_status(self.pin_list))

                if not clicked:
                    sleep_time = timestamp + self.time_interval - time()
                    if sleep_time > 0:
                        sleep(sleep_time)
                    timestamp += self.time_interval
                else:
                    break

        except KeyboardInterrupt:
            pass

        finally:
            self.visualizer.stop_music()
            keyboard.key_clean()
            print("Elapsed time: {:.2f}s".format(time() - display_start))


if __name__ == "__main__":
    analyzer = Analyzer("locked.mp3", 44100, 60, 0, [26, 13, 6, 5], 0.05)
    analyzer()

                 

main.py

--main.py is built on the top of display.py, visualize.py and analyze.py.
It shows the cover page for the user to choose which music to play, and then call either class MapReader or Analyzer to read corresponding map file and display the rhythm.


import display
from visualize import select_pos, MapReader
from analyze import Analyzer
from time import sleep


select_text = ["Select a music", "Peace & Love", "Locked Away", "July"]
select_music = {k: v for (k, v) in zip(select_text, select_pos)}
screen = display.Screen(on_tft=True)
all_pin = [5, 6, 13, 26][::-1]
frame_rate = 60
response_time = 0.05


def display_select_music():
    screen.clear()
    screen.render_text(select_music, 40, display.WHITE)
    screen.display()


display_select_music()
running = True
while running:
    pos = screen.get_click_pos()
    if pos:
        if pos[0][1] < 60:
            running = False
        else:
            if pos[0][1] < 120:
                map_reader = MapReader("peace.mp3", "peace&love.npy", all_pin, frame_rate, screen=screen)
                map_reader()
            elif pos[0][1] < 180:
                analyzer = Analyzer("locked.mp3", 44100, frame_rate, 0, all_pin, response_time, screen=screen)
                analyzer()
            else:
                analyzer = Analyzer("july.mp3", 44100, frame_rate, 0, all_pin, response_time, screen=screen)
                analyzer()
            display_select_music()

    sleep(0.01)