http1.ts 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import { interval as RxjsInterval, Subject, Subscription } from "rxjs"
  2. import { BehaviorSubject } from "rxjs"
  3. import { BufferService } from "../services/buffer.service"
  4. import { ConnectionState, Message } from "../interfaces/general.interface"
  5. import { v4 as uuidv4 } from 'uuid'
  6. import { error } from "console"
  7. console.log(`Testing for HTTP buffer service.`)
  8. let source: Subject<Message> = new Subject()
  9. let initialReport: ConnectionState = { status: 'DIRECT_PUBLISH' }
  10. let connectionStateSubject: BehaviorSubject<ConnectionState> = new BehaviorSubject(initialReport)
  11. let bufferService: BufferService = new BufferService(source, connectionStateSubject, 'test')
  12. /* So, have an interval obseravable that will post a method every second, but it will be buffered instead of being post.
  13. or something like that. */
  14. // Create an Observable that emits something every 1 second
  15. const interval = RxjsInterval(1000);
  16. interval.subscribe({
  17. next: time => {
  18. let message = {
  19. id: uuidv4(),
  20. message: `I am to be posted`
  21. }
  22. source.next(message)
  23. }
  24. })
  25. bufferService.getMessages().subscribe({
  26. next: message => {
  27. // Usage example:
  28. fetch('http://localhost:9999/data', {
  29. method: 'POST',
  30. body: JSON.stringify(message),
  31. headers: {
  32. "Content-type": "application/json; charset=UTF-8"
  33. }
  34. }).then((response) => {
  35. console.log(`sending ${message.id}`)
  36. console.log(response.status)
  37. connectionStateSubject.next({ status: 'DIRECT_PUBLISH' })
  38. }).catch((error) => {
  39. console.error(error)
  40. connectionStateSubject.next({ status: 'BUFFER' })
  41. periodicCheck()
  42. })
  43. }
  44. })
  45. function periodicCheck() {
  46. let timer = RxjsInterval(1000).subscribe({
  47. next: everySecond => {
  48. fetch('http://localhost:9999/', {
  49. method: 'GET',
  50. headers: {
  51. "Content-type": "application/json; charset=UTF-8"
  52. }
  53. }).then((response) => {
  54. if (response.ok) {
  55. connectionStateSubject.next({ status: 'DIRECT_PUBLISH' })
  56. timer.unsubscribe()
  57. } else {
  58. connectionStateSubject.next({ status: 'BUFFER' })
  59. }
  60. }).catch((error) => {
  61. connectionStateSubject.next({ status: 'BUFFER' })
  62. })
  63. }
  64. })
  65. }
  66. // async function postData(url, data): Promise<any> {
  67. // return new Promise(async (resolve, reject) => {
  68. // try {
  69. // const response = await fetch(url, {
  70. // method: 'POST',
  71. // mode: 'cors',
  72. // cache: 'no-cache',
  73. // credentials: 'same-origin',
  74. // headers: {
  75. // 'Content-Type': 'application/json'
  76. // },
  77. // redirect: 'follow',
  78. // referrerPolicy: 'no-referrer',
  79. // body: JSON.stringify(data)
  80. // });
  81. // if (!response.ok) {
  82. // // throw new Error(`HTTP error! Status: ${response.status}`);
  83. // reject(`HTTP error! Status: ${response.status}`);
  84. // }
  85. // let responseData;
  86. // try {
  87. // responseData = await response.json();
  88. // } catch (jsonError) {
  89. // // throw new Error('Failed to parse JSON response');
  90. // reject('Failed to parse JSON response');
  91. // }
  92. // resolve(responseData);
  93. // } catch (error) {
  94. // console.error('Error making POST request:', error);
  95. // throw error;
  96. // }
  97. // })
  98. // }