crud.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import json
  2. from sqlalchemy.orm import Session
  3. from . import models, schemas
  4. import uuid
  5. from datetime import datetime
  6. def get_user(db: Session, user_id: str):
  7. return db.query(models.User).filter(models.User.id == user_id).first()
  8. def get_users(db: Session):
  9. return db.query(models.User).all()
  10. def create_user(db: Session, user: schemas.UserAccount):
  11. db_user = models.User(
  12. id=user.id,
  13. name=user.name,
  14. department=user.department,
  15. medical_allowance=user.medical_allowance
  16. )
  17. db.add(db_user)
  18. db.commit()
  19. db.refresh(db_user)
  20. return db_user
  21. def get_claims(db: Session):
  22. return db.query(models.Claim).all()
  23. def create_claim(db: Session, claim: schemas.ClaimSubmission, user_id: str):
  24. # Calculate amount claimed (payout)
  25. user = get_user(db, user_id)
  26. if not user:
  27. return None
  28. spent_amount = claim.amount_spent
  29. remaining = user.medical_allowance
  30. amount_claimed = min(spent_amount, remaining)
  31. # Update User Balance
  32. user.medical_allowance -= amount_claimed
  33. # Create DB model
  34. db_claim = models.Claim(
  35. id=str(uuid.uuid4()),
  36. timestamp=datetime.now().isoformat(),
  37. user_id=user_id,
  38. amount_spent=spent_amount,
  39. amount_claimed=amount_claimed,
  40. provider_name=claim.provider_name,
  41. visit_date=claim.visit_date,
  42. treatment_type=claim.treatment_type,
  43. cost_center=claim.cost_center,
  44. declaration_signed=claim.declaration_signed,
  45. extraction_data=json.dumps(claim.extraction_data.model_dump()) if claim.extraction_data else None
  46. )
  47. db.add(db_claim)
  48. db.commit()
  49. db.refresh(db_claim)
  50. return db_claim
  51. def delete_claim(db: Session, claim_id: str):
  52. db_claim = db.query(models.Claim).filter(models.Claim.id == claim_id).first()
  53. if db_claim:
  54. # Refund user allowance
  55. user = get_user(db, db_claim.user_id)
  56. if user:
  57. user.medical_allowance += db_claim.amount_claimed
  58. db.delete(db_claim)
  59. db.commit()
  60. return True
  61. return False
  62. def to_pydantic_claim(db_claim: models.Claim) -> schemas.ClaimRecord:
  63. ext_data = None
  64. if db_claim.extraction_data:
  65. try:
  66. ext_data = schemas.ExtractionResponse(**json.loads(db_claim.extraction_data))
  67. except:
  68. pass
  69. return schemas.ClaimRecord(
  70. id=db_claim.id,
  71. timestamp=db_claim.timestamp,
  72. submitted_by=db_claim.owner.name if db_claim.owner else "Unknown",
  73. department=db_claim.owner.department if db_claim.owner else "Unknown",
  74. amount_spent=db_claim.amount_spent,
  75. amount_claimed=db_claim.amount_claimed,
  76. provider_name=db_claim.provider_name,
  77. visit_date=db_claim.visit_date,
  78. treatment_type=db_claim.treatment_type,
  79. cost_center=db_claim.cost_center,
  80. declaration_signed=db_claim.declaration_signed,
  81. extraction_data=ext_data
  82. )
  83. def seed_demo_user(db: Session):
  84. user_id = "demo-user-123"
  85. db_user = get_user(db, user_id)
  86. if not db_user:
  87. db_user = models.User(
  88. id=user_id,
  89. name="Demo User",
  90. department="Operations",
  91. medical_allowance=5000.0
  92. )
  93. db.add(db_user)
  94. db.commit()
  95. db.refresh(db_user)
  96. return db_user