2026-05-02 21:07:00 +07:00

746 lines
29 KiB
Python

"""
Backend API untuk Aplikasi Absensi Akademik — VERSI AMAN v2
Python Flask + MySQL + JWT Authentication
Ubhara Jaya — 2026
SECURITY FIXES v1:
[FIX 1] Validasi koordinat GPS di server (bukan hanya di client)
[FIX 2] Validasi foto: wajib ada, harus valid base64 image, min size
[FIX 3] JWT terikat ke device_id — token tidak bisa dibagikan antar HP
[FIX 4] Rate limiting — max 5 percobaan login / 3 submit absensi per menit
[FIX 5] Validasi kepemilikan jadwal — mahasiswa hanya bisa absen di jadwalnya sendiri
[FIX 6] Validasi timestamp — absensi hanya diterima saat jam kelas aktif
SECURITY FIXES v2 (BARU):
[FIX 7] Location Token — one-time token 2 menit, cegah bypass koordinat via Postman
[FIX 8] Deteksi anomali koordinat — koordinat terlalu bulat / persis titik kampus ditolak
"""
from flask import Flask, request, jsonify
from flask_cors import CORS
import mysql.connector
from mysql.connector import Error
import jwt
import bcrypt
from datetime import datetime, timedelta
from functools import wraps
import base64
import math
import time
import secrets
import requests
from collections import defaultdict
app = Flask(__name__)
CORS(app)
# ==================== KONFIGURASI ====================
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '',
'database': 'db_absensi_akademik'
}
SECRET_KEY = 'ubhara-jaya-absensi-secret-2026-change-this'
# Lokasi Kampus
# KAMPUS_LATITUDE = -6.223325
# KAMPUS_LONGITUDE = 107.009406
# RADIUS_METER = 500.0
# Testing
KAMPUS_LATITUDE = -6.2396008
KAMPUS_LONGITUDE = 107.0893571
RADIUS_METER = 500.0
# ==================== [FIX 7] LOCATION TOKEN STORE ====================
# Format: { 'token_hex': { 'id_mahasiswa': int, 'lat': float, 'lon': float, 'expires_at': float } }
# Disimpan di memory — otomatis hilang kalau server restart (aman, by design)
_location_tokens: dict = {}
LOCATION_TOKEN_TTL = 120 # detik — token expired dalam 2 menit
def bersihkan_token_expired():
"""Hapus token yang sudah expired dari store."""
now = time.time()
expired = [k for k, v in _location_tokens.items() if now > v['expires_at']]
for k in expired:
del _location_tokens[k]
# ==================== IN-MEMORY RATE LIMITER ====================
_rate_limit_store = defaultdict(list)
def is_rate_limited(identifier: str, max_calls: int, window_seconds: int) -> bool:
now = time.time()
_rate_limit_store[identifier] = [t for t in _rate_limit_store[identifier] if now - t < window_seconds]
if len(_rate_limit_store[identifier]) >= max_calls:
return True
_rate_limit_store[identifier].append(now)
return False
def get_client_ip():
return request.headers.get('X-Forwarded-For', request.remote_addr)
# ==================== HELPER JARAK GPS ====================
def hitung_jarak_meter(lat1, lon1, lat2, lon2):
"""Haversine formula."""
R = 6371000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# ==================== [FIX 8] DETEKSI ANOMALI KOORDINAT ====================
def deteksi_anomali_koordinat(lat: float, lon: float) -> tuple[bool, str]:
"""
Deteksi koordinat yang mencurigakan / kemungkinan di-input manual:
1. Koordinat 0,0 → GPS tidak aktif
2. Koordinat persis sama dengan konstanta kampus → copy-paste manual
3. Presisi desimal kurang dari 4 angka → bukan dari GPS asli
(GPS hardware selalu menghasilkan 6-8 angka desimal)
4. Nilai lat/lon di luar batas geografis Indonesia
"""
# 1. Koordinat null island
if lat == 0.0 and lon == 0.0:
return True, "GPS tidak aktif atau koordinat tidak valid (0,0)"
# 2. Persis sama dengan titik kampus (kemungkinan hardcoded/copy-paste)
if lat == KAMPUS_LATITUDE and lon == KAMPUS_LONGITUDE:
return True, "Koordinat identik dengan titik kampus, kemungkinan diinput manual"
# 3. Cek presisi desimal — GPS asli selalu punya ≥4 angka desimal
lat_str = f"{lat}"
lon_str = f"{lon}"
lat_desimal = len(lat_str.split('.')[-1]) if '.' in lat_str else 0
lon_desimal = len(lon_str.split('.')[-1]) if '.' in lon_str else 0
if lat_desimal < 4 or lon_desimal < 4:
return True, f"Presisi koordinat terlalu rendah ({lat_desimal}/{lon_desimal} desimal), bukan dari GPS asli"
# 4. Batas geografis Indonesia (lat: -11 s/d 6, lon: 95 s/d 141)
if not (-11.0 <= lat <= 6.0) or not (95.0 <= lon <= 141.0):
return True, "Koordinat di luar wilayah Indonesia"
return False, "OK"
# ==================== HELPER VALIDASI FOTO ====================
def validasi_foto(foto_base64: str) -> tuple[bool, str]:
if not foto_base64 or len(foto_base64.strip()) == 0:
return False, "Foto wajib disertakan"
if ',' in foto_base64:
foto_base64 = foto_base64.split(',')[1]
try:
decoded = base64.b64decode(foto_base64)
except Exception:
return False, "Format foto tidak valid"
is_jpg = decoded[:3] == b'\xff\xd8\xff'
is_png = decoded[:8] == b'\x89PNG\r\n\x1a\n'
is_webp = decoded[8:12] == b'WEBP'
if not (is_jpg or is_png or is_webp):
return False, "File bukan gambar yang valid (harus JPG/PNG/WEBP)"
if len(decoded) < 5 * 1024:
return False, "Ukuran foto terlalu kecil, pastikan foto wajah terambil dengan benar"
return True, "OK"
# ==================== DATABASE ====================
def get_db_connection():
try:
return mysql.connector.connect(**DB_CONFIG)
except Error as e:
print(f"DB Error: {e}")
return None
def init_database():
try:
temp_config = {k: v for k, v in DB_CONFIG.items() if k != 'database'}
connection = mysql.connector.connect(**temp_config)
except Error as e:
print(f"❌ Tidak bisa konek ke MySQL: {e}")
return
cursor = connection.cursor()
try:
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_CONFIG['database']}")
cursor.execute(f"USE {DB_CONFIG['database']}")
cursor.execute("""
CREATE TABLE IF NOT EXISTS mahasiswa (
id_mahasiswa INT AUTO_INCREMENT PRIMARY KEY,
npm VARCHAR(20) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
nama VARCHAR(100) NOT NULL,
jenkel VARCHAR(10) NOT NULL,
fakultas VARCHAR(100) NOT NULL,
jurusan VARCHAR(100) NOT NULL,
semester INT NOT NULL,
device_id VARCHAR(255),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS mata_kuliah (
id_matkul INT AUTO_INCREMENT PRIMARY KEY,
kode_matkul VARCHAR(20) UNIQUE NOT NULL,
nama_matkul VARCHAR(100) NOT NULL,
sks INT NOT NULL,
dosen VARCHAR(100) NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS jadwal_kelas (
id_jadwal INT AUTO_INCREMENT PRIMARY KEY,
id_matkul INT NOT NULL,
hari VARCHAR(10) NOT NULL,
jam_mulai TIME NOT NULL,
jam_selesai TIME NOT NULL,
ruangan VARCHAR(50) NOT NULL,
jurusan VARCHAR(100) NOT NULL,
semester INT NOT NULL,
FOREIGN KEY (id_matkul) REFERENCES mata_kuliah(id_matkul)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS absensi (
id_absensi INT AUTO_INCREMENT PRIMARY KEY,
id_mahasiswa INT NOT NULL,
npm VARCHAR(20) NOT NULL,
nama VARCHAR(100) NOT NULL,
id_jadwal INT,
mata_kuliah VARCHAR(100),
latitude DOUBLE,
longitude DOUBLE,
jarak_meter DOUBLE,
timestamp DATETIME NOT NULL,
photo LONGTEXT,
foto_base64 LONGTEXT,
status VARCHAR(20) NOT NULL DEFAULT 'HADIR',
device_id VARCHAR(255),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (id_mahasiswa) REFERENCES mahasiswa(id_mahasiswa),
FOREIGN KEY (id_jadwal) REFERENCES jadwal_kelas(id_jadwal)
)
""")
# Upgrade kolom dari versi lama (abaikan error jika sudah ada)
for col_sql in [
"ALTER TABLE mahasiswa ADD COLUMN device_id VARCHAR(255) AFTER semester",
"ALTER TABLE absensi ADD COLUMN jarak_meter DOUBLE AFTER longitude",
"ALTER TABLE absensi ADD COLUMN device_id VARCHAR(255) AFTER foto_base64",
]:
try:
cursor.execute(col_sql)
except Exception:
pass
connection.commit()
print("✅ Database & semua tabel siap!")
except Error as e:
print(f"❌ Error init DB: {e}")
finally:
cursor.close()
connection.close()
# ==================== HELPER WAKTU ====================
def get_hari_indo():
mapping = {
'Monday':'Senin','Tuesday':'Selasa','Wednesday':'Rabu',
'Thursday':'Kamis','Friday':'Jumat','Saturday':'Sabtu','Sunday':'Minggu'
}
return mapping.get(datetime.now().strftime('%A'), 'Senin')
# ==================== AUTO ALFA ====================
def jalankan_auto_alfa():
try:
conn = get_db_connection()
if conn is None:
return
cursor = conn.cursor(dictionary=True)
hari_ini = get_hari_indo()
jam_sekarang = datetime.now().time()
timestamp_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
SELECT j.id_jadwal, m.nama_matkul, j.jurusan, j.semester
FROM jadwal_kelas j JOIN mata_kuliah m ON j.id_matkul=m.id_matkul
WHERE j.hari=%s AND j.jam_selesai < %s
""", (hari_ini, jam_sekarang))
for j in cursor.fetchall():
cursor.execute(
"SELECT id_mahasiswa,npm,nama FROM mahasiswa WHERE jurusan=%s AND semester=%s",
(j['jurusan'], j['semester'])
)
for mhs in cursor.fetchall():
cursor.execute("""
SELECT COUNT(*) as cnt FROM absensi
WHERE id_mahasiswa=%s AND id_jadwal=%s AND DATE(timestamp)=CURDATE()
""", (mhs['id_mahasiswa'], j['id_jadwal']))
if cursor.fetchone()['cnt'] == 0:
cursor.execute("""
INSERT INTO absensi (id_mahasiswa,npm,nama,id_jadwal,mata_kuliah,
latitude,longitude,jarak_meter,timestamp,status)
VALUES (%s,%s,%s,%s,%s,0,0,0,%s,'TIDAK HADIR')
""", (mhs['id_mahasiswa'],mhs['npm'],mhs['nama'],
j['id_jadwal'],j['nama_matkul'],timestamp_str))
conn.commit()
print(f"⚠️ Auto-Alfa: {mhs['nama']} - {j['nama_matkul']}")
cursor.close(); conn.close()
except Exception as e:
print(f"Error Auto Alfa: {e}")
# ==================== JWT ====================
def generate_token(id_mahasiswa, npm, device_id):
payload = {
'id_mahasiswa': id_mahasiswa,
'npm': npm,
'device_id': device_id,
'exp': datetime.utcnow() + timedelta(days=30)
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization', '')
if not token:
return jsonify({'error': 'Token tidak ditemukan'}), 401
try:
if token.startswith('Bearer '):
token = token.split(' ')[1]
data = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
request.user_data = data
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token kadaluarsa, silakan login ulang'}), 401
except Exception:
return jsonify({'error': 'Token tidak valid'}), 401
return f(*args, **kwargs)
return decorated
# ==================== API ENDPOINTS ====================
@app.route('/api/health', methods=['GET'])
def health_check():
return jsonify({'status': 'OK', 'message': 'API Running — Secured Version v2'})
# -------- AUTH --------
@app.route('/api/auth/register', methods=['POST'])
def register():
if is_rate_limited(f"{get_client_ip()}:register", 3, 600):
return jsonify({'error': 'Terlalu banyak percobaan, coba lagi nanti'}), 429
try:
data = request.get_json()
for field in ['npm','password','nama','jenkel','fakultas','jurusan','semester','device_id']:
if not data.get(field):
return jsonify({'error': f'Field {field} wajib diisi'}), 400
hashed = bcrypt.hashpw(data['password'].encode(), bcrypt.gensalt()).decode()
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
"INSERT INTO mahasiswa (npm,password,nama,jenkel,fakultas,jurusan,semester,device_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)",
(data['npm'],hashed,data['nama'],data['jenkel'],data['fakultas'],data['jurusan'],data['semester'],data['device_id'])
)
conn.commit()
id_mhs = cur.lastrowid
cur.close(); conn.close()
token = generate_token(id_mhs, data['npm'], data['device_id'])
return jsonify({
'message': 'Registrasi berhasil',
'data': {
'token': token, 'id_mahasiswa': id_mhs, 'npm': data['npm'],
'nama': data['nama'], 'jenkel': data['jenkel'],
'fakultas': data['fakultas'], 'jurusan': data['jurusan'],
'semester': data['semester']
}
}), 201
except mysql.connector.IntegrityError:
return jsonify({'error': 'NPM sudah terdaftar'}), 409
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/auth/login', methods=['POST'])
def login():
if is_rate_limited(f"{get_client_ip()}:login", 5, 60):
return jsonify({'error': 'Terlalu banyak percobaan login, tunggu 1 menit'}), 429
try:
data = request.get_json()
device_id = data.get('device_id', '')
conn = get_db_connection()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT * FROM mahasiswa WHERE npm=%s", (data['npm'],))
mhs = cur.fetchone()
if not mhs or not bcrypt.checkpw(data['password'].encode(), mhs['password'].encode()):
return jsonify({'error': 'NPM atau Password salah'}), 401
# [FIX 3] Cek device binding
if mhs.get('device_id') and mhs['device_id'] != device_id:
cur.close(); conn.close()
return jsonify({'error': 'Akun ini sudah terdaftar di perangkat lain. Hubungi admin untuk reset.'}), 403
if device_id:
cur.execute("UPDATE mahasiswa SET device_id=%s WHERE id_mahasiswa=%s",
(device_id, mhs['id_mahasiswa']))
conn.commit()
cur.close(); conn.close()
token = generate_token(mhs['id_mahasiswa'], mhs['npm'], device_id)
mhs.pop('password', None)
return jsonify({'message': 'Login berhasil', 'data': {**mhs, 'token': token}}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/mahasiswa/profile', methods=['GET'])
@token_required
def get_profile():
conn = get_db_connection()
cur = conn.cursor(dictionary=True)
cur.execute(
"SELECT id_mahasiswa,npm,nama,jenkel,fakultas,jurusan,semester FROM mahasiswa WHERE id_mahasiswa=%s",
(request.user_data['id_mahasiswa'],)
)
mhs = cur.fetchone()
cur.close(); conn.close()
return jsonify({'data': mhs}), 200
# -------- JADWAL --------
@app.route('/api/jadwal/today', methods=['GET'])
@token_required
def get_jadwal_today():
try:
jalankan_auto_alfa()
hari_ini = get_hari_indo()
conn = get_db_connection()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT jurusan,semester FROM mahasiswa WHERE id_mahasiswa=%s",
(request.user_data['id_mahasiswa'],))
mhs = cur.fetchone()
cur.execute("""
SELECT j.*, m.nama_matkul, m.sks, m.dosen, m.kode_matkul
FROM jadwal_kelas j JOIN mata_kuliah m ON j.id_matkul=m.id_matkul
WHERE j.hari=%s AND j.jurusan=%s AND j.semester=%s
ORDER BY j.jam_mulai
""", (hari_ini, mhs['jurusan'], mhs['semester']))
jadwal = cur.fetchall()
for j in jadwal:
for col in ['jam_mulai','jam_selesai']:
if isinstance(j.get(col), timedelta):
j[col] = str(j[col])
cur.execute("""
SELECT status FROM absensi
WHERE id_mahasiswa=%s AND id_jadwal=%s AND DATE(timestamp)=CURDATE()
""", (request.user_data['id_mahasiswa'], j['id_jadwal']))
res = cur.fetchone()
j['sudah_absen'] = bool(res)
j['status_absensi'] = res['status'] if res else None
cur.close(); conn.close()
return jsonify({'data': jadwal, 'hari': hari_ini})
except Exception as e:
return jsonify({'error': str(e)}), 500
# -------- [FIX 7] LOCATION TOKEN --------
@app.route('/api/absensi/request-location-token', methods=['POST'])
@token_required
def request_location_token():
"""
Android memanggil endpoint ini tepat setelah GPS fix diterima.
Backend mencatat waktu & koordinat, lalu memberikan token sekali pakai.
Token hanya valid 2 menit dan hanya bisa dipakai 1x.
Alur wajib di Android:
1. Terima GPS fix
2. POST /api/absensi/request-location-token → dapat location_token
3. Ambil foto selfie (maks dalam 2 menit)
4. POST /api/absensi/submit + location_token
"""
# Rate limit: maks 10 request token per 5 menit per user
user_id = str(request.user_data['id_mahasiswa'])
if is_rate_limited(f"{user_id}:loc_token", 10, 300):
return jsonify({'error': 'Terlalu banyak permintaan token lokasi'}), 429
try:
data = request.get_json()
lat = data.get('latitude')
lon = data.get('longitude')
if lat is None or lon is None:
return jsonify({'error': 'Koordinat GPS wajib disertakan'}), 400
try:
lat = float(lat)
lon = float(lon)
except (ValueError, TypeError):
return jsonify({'error': 'Koordinat tidak valid'}), 400
# [FIX 8] Jalankan deteksi anomali koordinat
mencurigakan, alasan = deteksi_anomali_koordinat(lat, lon)
if mencurigakan:
return jsonify({'error': f'Koordinat tidak valid: {alasan}'}), 400
# Cek apakah sudah dalam radius kampus
jarak = hitung_jarak_meter(lat, lon, KAMPUS_LATITUDE, KAMPUS_LONGITUDE)
if jarak > RADIUS_METER:
return jsonify({
'error': f'Lokasi Anda terlalu jauh dari kampus ({jarak:.0f}m). Maksimal {RADIUS_METER:.0f}m.',
'jarak_meter': round(jarak, 1)
}), 403
# Bersihkan token lama yang expired
bersihkan_token_expired()
# Buat location token baru
loc_token = secrets.token_hex(32)
_location_tokens[loc_token] = {
'id_mahasiswa': request.user_data['id_mahasiswa'],
'device_id': request.user_data.get('device_id', ''),
'lat': lat,
'lon': lon,
'jarak_meter': round(jarak, 1),
'expires_at': time.time() + LOCATION_TOKEN_TTL
}
print(f"📍 Location token diterbitkan untuk user {user_id} | jarak: {jarak:.1f}m")
return jsonify({
'location_token': loc_token,
'expires_in_seconds': LOCATION_TOKEN_TTL,
'jarak_meter': round(jarak, 1),
'message': f'Token lokasi valid selama {LOCATION_TOKEN_TTL} detik. Segera ambil foto dan submit absensi.'
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# -------- ABSENSI SUBMIT --------
@app.route('/api/absensi/submit', methods=['POST'])
@token_required
def submit_absensi():
# [FIX 4] Rate limit
user_id = str(request.user_data['id_mahasiswa'])
if is_rate_limited(f"{user_id}:absensi", 3, 60):
return jsonify({'error': 'Terlalu banyak request, coba lagi sebentar'}), 429
try:
data = request.get_json()
status = data.get('status', 'HADIR')
conn = get_db_connection()
cur = conn.cursor(dictionary=True)
# [FIX 3] Verifikasi device_id
token_device_id = request.user_data.get('device_id', '')
cur.execute("SELECT device_id,nama,jurusan,semester FROM mahasiswa WHERE id_mahasiswa=%s",
(request.user_data['id_mahasiswa'],))
mhs_data = cur.fetchone()
if mhs_data.get('device_id') and mhs_data['device_id'] != token_device_id:
cur.close(); conn.close()
return jsonify({'error': 'Request berasal dari perangkat tidak sah'}), 403
# [FIX 7] Validasi location token — WAJIB ADA
loc_token = data.get('location_token')
if not loc_token:
cur.close(); conn.close()
return jsonify({
'error': 'Location token wajib disertakan. Panggil /api/absensi/request-location-token terlebih dahulu.'
}), 400
token_data = _location_tokens.get(loc_token)
if not token_data:
cur.close(); conn.close()
return jsonify({'error': 'Location token tidak valid atau sudah digunakan'}), 403
if time.time() > token_data['expires_at']:
del _location_tokens[loc_token]
cur.close(); conn.close()
return jsonify({'error': f'Location token sudah kadaluarsa (maks {LOCATION_TOKEN_TTL} detik). Silakan ulangi proses absensi.'}), 403
if token_data['id_mahasiswa'] != request.user_data['id_mahasiswa']:
cur.close(); conn.close()
return jsonify({'error': 'Location token bukan milik Anda'}), 403
if token_data['device_id'] != token_device_id:
cur.close(); conn.close()
return jsonify({'error': 'Location token digunakan dari perangkat berbeda'}), 403
# Ambil koordinat dan jarak yang sudah diverifikasi dari token (bukan dari request body)
# Ini mencegah manipulasi koordinat di tahap submit
lat = token_data['lat']
lon = token_data['lon']
jarak = token_data['jarak_meter']
# Hapus token — ONE TIME USE
del _location_tokens[loc_token]
# [FIX 5] Validasi kepemilikan jadwal
cur.execute("""
SELECT j.id_jadwal,j.jam_mulai,j.jam_selesai,j.jurusan,j.semester,m.nama_matkul
FROM jadwal_kelas j JOIN mata_kuliah m ON j.id_matkul=m.id_matkul
WHERE j.id_jadwal=%s AND j.jurusan=%s AND j.semester=%s
""", (data['id_jadwal'], mhs_data['jurusan'], mhs_data['semester']))
jadwal = cur.fetchone()
if not jadwal:
cur.close(); conn.close()
return jsonify({'error': 'Jadwal tidak valid atau bukan milik kelas Anda'}), 403
# [FIX 6] Validasi jam kelas aktif
jam_sekarang = datetime.now().time()
jam_mulai = (datetime.min + jadwal['jam_mulai']).time() if isinstance(jadwal['jam_mulai'], timedelta) else jadwal['jam_mulai']
jam_selesai = (datetime.min + jadwal['jam_selesai']).time() if isinstance(jadwal['jam_selesai'], timedelta) else jadwal['jam_selesai']
if not (jam_mulai <= jam_sekarang <= jam_selesai):
cur.close(); conn.close()
return jsonify({
'error': f'Absensi hanya bisa dilakukan saat jam kelas aktif ({jam_mulai.strftime("%H:%M")} - {jam_selesai.strftime("%H:%M")})'
}), 403
# [FIX 2] Validasi foto
foto_input = data.get('foto_base64') or data.get('photo')
valid, pesan = validasi_foto(foto_input)
if not valid:
cur.close(); conn.close()
return jsonify({'error': f'Foto tidak valid: {pesan}'}), 400
# Cek double absen
cur.execute(
"SELECT COUNT(*) as c FROM absensi WHERE id_mahasiswa=%s AND id_jadwal=%s AND DATE(timestamp)=CURDATE()",
(request.user_data['id_mahasiswa'], data['id_jadwal'])
)
if cur.fetchone()['c'] > 0:
cur.close(); conn.close()
return jsonify({'error': 'Anda sudah absen mata kuliah ini hari ini!'}), 400
timestamp_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cur.execute("""
INSERT INTO absensi (id_mahasiswa,npm,nama,id_jadwal,mata_kuliah,
latitude,longitude,jarak_meter,timestamp,photo,foto_base64,status,device_id)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (request.user_data['id_mahasiswa'], request.user_data['npm'], mhs_data['nama'],
data['id_jadwal'], jadwal['nama_matkul'],
lat, lon, jarak, timestamp_str,
foto_input, foto_input, status, token_device_id))
conn.commit()
new_id = cur.lastrowid
cur.close(); conn.close()
# Kirim ke N8N webhook
try:
requests.post(
"https://n8n.lab.ubharajaya.ac.id/webhook/23c6993d-1792-48fb-ad1c-ffc78a3e6254",
json={
"id_absensi": new_id, "npm": request.user_data['npm'],
"nama": mhs_data['nama'], "mata_kuliah": jadwal['nama_matkul'],
"latitude": lat, "longitude": lon, "jarak_meter": jarak,
"timestamp": timestamp_str, "status": status, "foto_base64": foto_input
}, timeout=10
)
except Exception as e:
print(f"⚠️ Gagal kirim ke N8N: {e}")
return jsonify({
'message': 'Absensi berhasil disimpan',
'data': {
'id_absensi': new_id, 'status': status,
'mata_kuliah': jadwal['nama_matkul'],
'jarak_meter': jarak,
'timestamp': timestamp_str
}
}), 201
except Exception as e:
print(f"Error submit absensi: {e}")
return jsonify({'error': str(e)}), 500
# -------- RIWAYAT & FOTO --------
@app.route('/api/absensi/history', methods=['GET'])
@token_required
def get_history():
try:
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
conn = get_db_connection()
cur = conn.cursor(dictionary=True)
query = """
SELECT a.id_absensi,a.npm,a.nama,a.mata_kuliah,a.latitude,a.longitude,
a.jarak_meter,a.timestamp,a.status,a.created_at,j.jam_mulai,j.jam_selesai
FROM absensi a LEFT JOIN jadwal_kelas j ON a.id_jadwal=j.id_jadwal
WHERE a.id_mahasiswa=%s
"""
params = [request.user_data['id_mahasiswa']]
if start_date:
query += " AND DATE(a.timestamp)>=%s"; params.append(start_date)
if end_date:
query += " AND DATE(a.timestamp)<=%s"; params.append(end_date)
query += " ORDER BY a.timestamp DESC"
cur.execute(query, params)
history = cur.fetchall()
for item in history:
for col in ['jam_mulai','jam_selesai']:
if isinstance(item.get(col), timedelta):
item[col] = str(item[col])
cur.close(); conn.close()
return jsonify({'data': history}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/absensi/photo/<int:id_absensi>', methods=['GET'])
@token_required
def get_photo(id_absensi):
conn = get_db_connection()
cur = conn.cursor(dictionary=True)
cur.execute(
"SELECT foto_base64 FROM absensi WHERE id_absensi=%s AND id_mahasiswa=%s",
(id_absensi, request.user_data['id_mahasiswa'])
)
result = cur.fetchone()
cur.close(); conn.close()
if result:
return jsonify({'data': result}), 200
return jsonify({'error': 'Foto tidak ditemukan atau bukan milik Anda'}), 404
# ==================== RUN SERVER ====================
if __name__ == '__main__':
print("🚀 Menginisialisasi database...")
init_database()
print("🔒 Security fixes aktif:")
print(" ✅ [FIX 1] Server-side GPS validation")
print(" ✅ [FIX 2] Server-side foto validation")
print(" ✅ [FIX 3] JWT device binding")
print(" ✅ [FIX 4] Rate limiting")
print(" ✅ [FIX 5] Jadwal ownership validation")
print(" ✅ [FIX 6] Jam kelas aktif validation")
print(" ✅ [FIX 7] Location token (one-time, 2 menit)")
print(" ✅ [FIX 8] Anomali koordinat detection")
print("🌐 Starting Flask server...")
app.run(debug=True, host='0.0.0.0', port=5000)