import { ensureDefined } from '@hirn.app/shared'
import {
	BehaviorSubject,
	distinctUntilChanged,
	filter,
	firstValueFrom,
	mergeMap,
	switchMap,
} from 'rxjs'
import { SubscribePayload } from 'graphql-ws'
import { GraphQLError } from 'graphql'
import { isDefined } from 'remeda'
import { outdent } from 'outdent'
import { logError } from '../monitoring/Monitoring'
import {
	Change,
	databaseCurrent,
	dbStateObservable,
} from './Database'
import {
	ValidationErrors,
	getGraphQLErrorMessages,
} from './validation'
import { keycloak } from '../security/keycloak'
import { accessTokenObserver } from '../security/AuthProvider'
import { lineState$ } from '../Offline'
import {
	RestartableClient,
	createRestartableClient,
} from './RestartableClient'

const gqlClientObservable = new BehaviorSubject<RestartableClient | null>(null)

// easier to follow than uuid
let requestId = 0
// it's ok to be not unique
const prefix = Math.floor(Math.random() * 100_000)
function getRequestId() {
	if (requestId === Number.MAX_SAFE_INTEGER) {
		requestId = 0
	}
	return `${prefix}-${requestId++}`
}

function createGqlClient(): Promise<RestartableClient> {
	return new Promise<RestartableClient>((resolve, reject) => {
		let activeSocket: WebSocket
		let timedOut: ReturnType<typeof setTimeout>
		const client = createRestartableClient({
			url: ensureDefined(process.env.GRAPHQL_API_ENDPOINT),
			keepAlive: 10_000,
			generateID: getRequestId,
			lazy: false,
			onNonLazyError: async error => {
				await logError('createGqlClient.onNonLazyError', error)
				reject(error)
			},
			connectionParams: async () => {
				await keycloak.updateToken(20)
				return {
					headers: {
						Authorization: `Bearer ${ensureDefined(keycloak.token)}`,
					},
				}
			},
			on: {
				connected: socket => {
					activeSocket = socket as WebSocket
					resolve(client)
					lineState$.next('online')
				},
				closed: () => {
					lineState$.next('offline')
					if (activeSocket.readyState === WebSocket.OPEN) {
						activeSocket.close(4408, 'Request Timeout')
					}
				},
				// https://github.com/enisdenjo/graphql-ws#ping-from-client
				ping: received => {
					if (!received) {
						// if no pong within 3 seconds happen, then the
						// connection is really bad or we are offline
						timedOut = setTimeout(() => {
							if (activeSocket.readyState === WebSocket.OPEN) {
								activeSocket.close(4408, 'Request Timeout')
							}
						}, 3_000)
					}
				},
				pong: received => {
					if (received) {
						clearTimeout(timedOut)
					}
				},
			},
		})
	})
}

accessTokenObserver
	.pipe(
		// keycloak calls onTokens twice
		distinctUntilChanged(),
		switchMap(async token => {
			// logout, destroy connection
			if (!token) {
				const client = gqlClientObservable.value
				gqlClientObservable.next(null)
				await client?.dispose()
				return
			}
			if (gqlClientObservable.value === null) {
				const client = await createGqlClient()
				gqlClientObservable.next(client)
				return
			}
			// reconnect to apply new token
			gqlClientObservable.value.restart()
		}),
	)
	.subscribe()

/**
 * one-shot graphql queries
 */
export async function query<T>(
	gqlRequest: SubscribePayload,
	attempts = 3,
): Promise<T> {
	const client = await firstValueFrom(gqlClientObservable.pipe(filter(isDefined)))
	return new Promise<T>((resolve, reject) => {
		let result: T
		client.subscribe<T>(gqlRequest, {
			next(data) {
				if (data.errors) {
					reject(new ValidationErrors(getGraphQLErrorMessages(data.errors)))
					return
				}
				result = data.data as unknown as T
			},
			complete() {
				resolve(result)
			},
			// eslint-disable-next-line
			async error(error: any) {
				if (attempts > 0 && error[0]?.extensions?.code === 'unexpected' && error[0]?.message === 'database query error') {
					await logError('retry request', JSON.stringify({
						gqlRequest,
						attempts,
						error,
					}))
					return query<T>(gqlRequest, attempts - 1)
						.then(resolve)
						.catch(reject)
				}
				reject(new ValidationErrors(getGraphQLErrorMessages(error as GraphQLError[])))
			},
		})
	})
}

export async function connect(handler: (
	changes: Change[]) => Promise<Change | null>): Promise<void> {
	const client = await firstValueFrom(gqlClientObservable.pipe(filter(isDefined)))
	const db = await databaseCurrent()
	const subscriptionQuery = outdent`
		subscription onChanges($createdat: timestamptz!, $id: uuid!) {
			updates(
				order_by: {createdat: asc, id: asc},
				where: {
					_or: [
						{createdat: {_gt: $createdat}},
						{_and: {
							createdat: {_eq: $createdat},
							id: {_gt: $id},
						}}
					]
				}
			) {
				id
				createdat
				payload
				topic
			}
		}`
	// we need more control over resolvement
	// eslint-disable-next-line no-async-promise-executor
	return new Promise<void>(async resolve => {
		const storedLastChange = ensureDefined(await db.settings.getLastChange())
		const unsubscribe = client.subscribe({
			query: subscriptionQuery,
			variables: {
				id: storedLastChange.id,
				createdat: storedLastChange.createdat,
			},
		}, {
			async next(data) {
				const response = data as { data: { updates: Change[] } }
				const changes = response.data.updates

				if (changes.length > 0) {
					for (const change of changes) {
						const lastChange = await handler([change])
						if (lastChange !== null) {
							await db.settings.setLastChange(lastChange)
						}
					}
					// subscriptions doesn't support changing variables,
					// therefore we resubscribe with new values
					unsubscribe()
					resolve()
				}
			},
			async error(error: unknown) {
				await logError(`error on changes`, JSON.stringify(error))
				unsubscribe()
				// reject leads to unsubscribe forever,
				// therefore resolve is used
				resolve()
			},
			complete() {
				// nothing special
				resolve()
			},
		})

		// if db goes into uninitilaized state, then shutdown watcher
		const dbStateSubscription = dbStateObservable
			.pipe(mergeMap(async state => {
				if (state !== 'initialized') {
					unsubscribe()
					dbStateSubscription.unsubscribe()
					await client.dispose()
					resolve()
				}
			}))
			.subscribe()
	})
}

interface PaginatorCallback<A, R extends A[]> {
	(props: {
		offset: number
		limit: number
	}): Promise<R>
}

export async function paginator<A, R extends A[]>(
	fn: PaginatorCallback<A, R>,
	limit = 1000,
): Promise<R> {
	const result: R = [] as unknown as R
	const maxPages = 10
	for (let offset = 0; offset < limit * maxPages; offset += limit) {
		const response = await fn({ offset, limit })
		result.push(...response)
		if (response.length < limit) {
			break
		}
		if (offset === maxPages) {
			throw new Error(`tried to fetch more than 10 pages for ${fn.name}`)
		}
	}
	return result
}
